mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-12 10:49:06 +01:00
Close connections cleanly when starting key agreement.
This commit is contained in:
@@ -2,6 +2,7 @@ package org.briarproject.bramble.api.sync;
|
||||
|
||||
import org.briarproject.bramble.api.contact.ContactId;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.InputStream;
|
||||
@@ -11,9 +12,9 @@ public interface SyncSessionFactory {
|
||||
|
||||
SyncSession createIncomingSession(ContactId c, InputStream in);
|
||||
|
||||
SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency,
|
||||
StreamWriter streamWriter);
|
||||
SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
|
||||
int maxLatency, StreamWriter streamWriter);
|
||||
|
||||
SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
|
||||
int maxIdleTime, StreamWriter streamWriter);
|
||||
SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
|
||||
int maxLatency, int maxIdleTime, StreamWriter streamWriter);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package org.briarproject.bramble.api.sync.event;
|
||||
|
||||
import org.briarproject.bramble.api.event.Event;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
/**
|
||||
* An event that is broadcast when all sync connections using a given
|
||||
* transport should be closed.
|
||||
*/
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public class CloseSyncConnectionsEvent extends Event {
|
||||
|
||||
private final TransportId transportId;
|
||||
|
||||
public CloseSyncConnectionsEvent(TransportId transportId) {
|
||||
this.transportId = transportId;
|
||||
}
|
||||
|
||||
public TransportId getTransportId() {
|
||||
return transportId;
|
||||
}
|
||||
}
|
||||
@@ -130,8 +130,8 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
TransportConnectionWriter w) throws IOException {
|
||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
||||
w.getOutputStream(), ctx);
|
||||
ContactId c = requireNonNull(ctx.getContactId());
|
||||
return syncSessionFactory.createSimplexOutgoingSession(c,
|
||||
return syncSessionFactory.createSimplexOutgoingSession(
|
||||
requireNonNull(ctx.getContactId()), ctx.getTransportId(),
|
||||
w.getMaxLatency(), streamWriter);
|
||||
}
|
||||
|
||||
@@ -139,8 +139,8 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
TransportConnectionWriter w) throws IOException {
|
||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
||||
w.getOutputStream(), ctx);
|
||||
ContactId c = requireNonNull(ctx.getContactId());
|
||||
return syncSessionFactory.createDuplexOutgoingSession(c,
|
||||
return syncSessionFactory.createDuplexOutgoingSession(
|
||||
requireNonNull(ctx.getContactId()), ctx.getTransportId(),
|
||||
w.getMaxLatency(), w.getMaxIdleTime(), streamWriter);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package org.briarproject.bramble.plugin.bluetooth;
|
||||
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
|
||||
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
|
||||
import org.briarproject.bramble.api.system.Clock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
@@ -18,6 +19,7 @@ import javax.inject.Inject;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.plugin.BluetoothConstants.ID;
|
||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
|
||||
@NotNullByDefault
|
||||
@@ -27,6 +29,7 @@ class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
|
||||
private static final Logger LOG =
|
||||
getLogger(BluetoothConnectionLimiterImpl.class.getName());
|
||||
|
||||
private final EventBus eventBus;
|
||||
private final Clock clock;
|
||||
|
||||
private final Object lock = new Object();
|
||||
@@ -38,24 +41,18 @@ class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
|
||||
private int connectionLimit = 1;
|
||||
|
||||
@Inject
|
||||
BluetoothConnectionLimiterImpl(Clock clock) {
|
||||
BluetoothConnectionLimiterImpl(EventBus eventBus, Clock clock) {
|
||||
this.eventBus = eventBus;
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void keyAgreementStarted() {
|
||||
List<DuplexTransportConnection> close;
|
||||
synchronized (lock) {
|
||||
keyAgreementInProgress = true;
|
||||
close = new ArrayList<>(connections.size());
|
||||
for (ConnectionRecord rec : connections) close.add(rec.connection);
|
||||
connections.clear();
|
||||
}
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
LOG.info("Key agreement started, closing " + close.size() +
|
||||
" connections");
|
||||
}
|
||||
for (DuplexTransportConnection conn : close) tryToClose(conn);
|
||||
LOG.info("Key agreement started");
|
||||
eventBus.broadcast(new CloseSyncConnectionsEvent(ID));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -11,6 +11,7 @@ import org.briarproject.bramble.api.event.EventListener;
|
||||
import org.briarproject.bramble.api.lifecycle.IoExecutor;
|
||||
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.Offer;
|
||||
@@ -18,6 +19,7 @@ import org.briarproject.bramble.api.sync.Request;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
|
||||
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
|
||||
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
|
||||
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
|
||||
@@ -71,6 +73,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
private final EventBus eventBus;
|
||||
private final Clock clock;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final int maxLatency, maxIdleTime;
|
||||
private final StreamWriter streamWriter;
|
||||
private final SyncRecordWriter recordWriter;
|
||||
@@ -86,14 +89,15 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency,
|
||||
int maxIdleTime, StreamWriter streamWriter,
|
||||
SyncRecordWriter recordWriter) {
|
||||
EventBus eventBus, Clock clock, ContactId contactId,
|
||||
TransportId transportId, int maxLatency, int maxIdleTime,
|
||||
StreamWriter streamWriter, SyncRecordWriter recordWriter) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.clock = clock;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.maxIdleTime = maxIdleTime;
|
||||
this.streamWriter = streamWriter;
|
||||
@@ -223,6 +227,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
} else if (e instanceof LifecycleEvent) {
|
||||
LifecycleEvent l = (LifecycleEvent) e;
|
||||
if (l.getLifecycleState() == STOPPING) interrupt();
|
||||
} else if (e instanceof CloseSyncConnectionsEvent) {
|
||||
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
|
||||
if (c.getTransportId().equals(transportId)) interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,11 +11,13 @@ import org.briarproject.bramble.api.event.EventListener;
|
||||
import org.briarproject.bramble.api.lifecycle.IoExecutor;
|
||||
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -56,6 +58,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
private final Executor dbExecutor;
|
||||
private final EventBus eventBus;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final int maxLatency;
|
||||
private final StreamWriter streamWriter;
|
||||
private final SyncRecordWriter recordWriter;
|
||||
@@ -65,12 +68,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, ContactId contactId, int maxLatency,
|
||||
StreamWriter streamWriter, SyncRecordWriter recordWriter) {
|
||||
EventBus eventBus, ContactId contactId, TransportId transportId,
|
||||
int maxLatency, StreamWriter streamWriter,
|
||||
SyncRecordWriter recordWriter) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.streamWriter = streamWriter;
|
||||
this.recordWriter = recordWriter;
|
||||
@@ -123,6 +128,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
} else if (e instanceof LifecycleEvent) {
|
||||
LifecycleEvent l = (LifecycleEvent) e;
|
||||
if (l.getLifecycleState() == STOPPING) interrupt();
|
||||
} else if (e instanceof CloseSyncConnectionsEvent) {
|
||||
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
|
||||
if (c.getTransportId().equals(transportId)) interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordReader;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordReaderFactory;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
@@ -53,22 +54,23 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public SyncSession createSimplexOutgoingSession(ContactId c,
|
||||
public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
|
||||
int maxLatency, StreamWriter streamWriter) {
|
||||
OutputStream out = streamWriter.getOutputStream();
|
||||
SyncRecordWriter recordWriter =
|
||||
recordWriterFactory.createRecordWriter(out);
|
||||
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c,
|
||||
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
|
||||
maxLatency, streamWriter, recordWriter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
|
||||
int maxIdleTime, StreamWriter streamWriter) {
|
||||
public SyncSession createDuplexOutgoingSession(ContactId c,
|
||||
TransportId t, int maxLatency, int maxIdleTime,
|
||||
StreamWriter streamWriter) {
|
||||
OutputStream out = streamWriter.getOutputStream();
|
||||
SyncRecordWriter recordWriter =
|
||||
recordWriterFactory.createRecordWriter(out);
|
||||
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c,
|
||||
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t,
|
||||
maxLatency, maxIdleTime, streamWriter, recordWriter);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
package org.briarproject.bramble.plugin.bluetooth;
|
||||
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
|
||||
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
||||
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
|
||||
import org.briarproject.bramble.api.system.Clock;
|
||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||
import org.jmock.Expectations;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.briarproject.bramble.plugin.bluetooth.BluetoothConnectionLimiter.STABILITY_PERIOD_MS;
|
||||
@@ -14,6 +16,7 @@ import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase {
|
||||
|
||||
private final EventBus eventBus = context.mock(EventBus.class);
|
||||
private final Clock clock = context.mock(Clock.class);
|
||||
private final DuplexTransportConnection conn =
|
||||
context.mock(DuplexTransportConnection.class);
|
||||
@@ -24,11 +27,15 @@ public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase {
|
||||
|
||||
private final long now = System.currentTimeMillis();
|
||||
|
||||
private BluetoothConnectionLimiter limiter;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
limiter = new BluetoothConnectionLimiterImpl(eventBus, clock);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLimiterAllowsOneOutgoingConnection() {
|
||||
BluetoothConnectionLimiter limiter =
|
||||
new BluetoothConnectionLimiterImpl(clock);
|
||||
|
||||
expectGetCurrentTime(now);
|
||||
assertTrue(limiter.canOpenContactConnection());
|
||||
|
||||
@@ -41,9 +48,6 @@ public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase {
|
||||
|
||||
@Test
|
||||
public void testLimiterAllowsSecondIncomingConnection() throws Exception {
|
||||
BluetoothConnectionLimiter limiter =
|
||||
new BluetoothConnectionLimiterImpl(clock);
|
||||
|
||||
expectGetCurrentTime(now);
|
||||
assertTrue(limiter.canOpenContactConnection());
|
||||
|
||||
@@ -64,11 +68,7 @@ public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLimiterAllowsSecondOutgoingConnectionWhenFirstIsStable()
|
||||
throws Exception {
|
||||
BluetoothConnectionLimiter limiter =
|
||||
new BluetoothConnectionLimiterImpl(clock);
|
||||
|
||||
public void testLimiterAllowsSecondOutgoingConnectionWhenFirstIsStable() {
|
||||
expectGetCurrentTime(now);
|
||||
assertTrue(limiter.canOpenContactConnection());
|
||||
|
||||
@@ -87,9 +87,6 @@ public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase {
|
||||
@Test
|
||||
public void testLimiterAllowsThirdIncomingConnectionWhenFirstTwoAreStable()
|
||||
throws Exception {
|
||||
BluetoothConnectionLimiter limiter =
|
||||
new BluetoothConnectionLimiterImpl(clock);
|
||||
|
||||
expectGetCurrentTime(now);
|
||||
assertTrue(limiter.canOpenContactConnection());
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.contact.ContactId;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.Transaction;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
@@ -23,6 +24,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.test.TestUtils.getContactId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getMessage;
|
||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getTransportId;
|
||||
|
||||
public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
|
||||
@@ -36,14 +38,15 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
|
||||
private final Executor dbExecutor = new ImmediateExecutor();
|
||||
private final ContactId contactId = getContactId();
|
||||
private final TransportId transportId = getTransportId();
|
||||
private final Message message = getMessage(new GroupId(getRandomId()));
|
||||
private final MessageId messageId = message.getId();
|
||||
|
||||
@Test
|
||||
public void testNothingToSend() throws Exception {
|
||||
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
|
||||
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter,
|
||||
recordWriter);
|
||||
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
|
||||
streamWriter, recordWriter);
|
||||
Transaction noAckTxn = new Transaction(null, false);
|
||||
Transaction noMsgTxn = new Transaction(null, false);
|
||||
|
||||
@@ -76,8 +79,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
public void testSomethingToSend() throws Exception {
|
||||
Ack ack = new Ack(singletonList(messageId));
|
||||
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
|
||||
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter,
|
||||
recordWriter);
|
||||
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
|
||||
streamWriter, recordWriter);
|
||||
Transaction ackTxn = new Transaction(null, false);
|
||||
Transaction noAckTxn = new Transaction(null, false);
|
||||
Transaction msgTxn = new Transaction(null, false);
|
||||
|
||||
Reference in New Issue
Block a user