diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java index 216f29398..7dca2407f 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java @@ -2,6 +2,7 @@ package org.briarproject.bramble.api.sync; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.transport.StreamWriter; import java.io.InputStream; @@ -11,9 +12,9 @@ public interface SyncSessionFactory { SyncSession createIncomingSession(ContactId c, InputStream in); - SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency, - StreamWriter streamWriter); + SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, + int maxLatency, StreamWriter streamWriter); - SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, - int maxIdleTime, StreamWriter streamWriter); + SyncSession createDuplexOutgoingSession(ContactId c, TransportId t, + int maxLatency, int maxIdleTime, StreamWriter streamWriter); } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/event/CloseSyncConnectionsEvent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/event/CloseSyncConnectionsEvent.java new file mode 100644 index 000000000..92d2b3c19 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/event/CloseSyncConnectionsEvent.java @@ -0,0 +1,26 @@ +package org.briarproject.bramble.api.sync.event; + +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.TransportId; + +import javax.annotation.concurrent.Immutable; + +/** + * An event that is broadcast when all sync connections using a given + * transport should be closed. + */ +@Immutable +@NotNullByDefault +public class CloseSyncConnectionsEvent extends Event { + + private final TransportId transportId; + + public CloseSyncConnectionsEvent(TransportId transportId) { + this.transportId = transportId; + } + + public TransportId getTransportId() { + return transportId; + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index ba45b6e1a..35fd07b7f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -130,8 +130,8 @@ class ConnectionManagerImpl implements ConnectionManager { TransportConnectionWriter w) throws IOException { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); - ContactId c = requireNonNull(ctx.getContactId()); - return syncSessionFactory.createSimplexOutgoingSession(c, + return syncSessionFactory.createSimplexOutgoingSession( + requireNonNull(ctx.getContactId()), ctx.getTransportId(), w.getMaxLatency(), streamWriter); } @@ -139,8 +139,8 @@ class ConnectionManagerImpl implements ConnectionManager { TransportConnectionWriter w) throws IOException { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); - ContactId c = requireNonNull(ctx.getContactId()); - return syncSessionFactory.createDuplexOutgoingSession(c, + return syncSessionFactory.createDuplexOutgoingSession( + requireNonNull(ctx.getContactId()), ctx.getTransportId(), w.getMaxLatency(), w.getMaxIdleTime(), streamWriter); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionLimiterImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionLimiterImpl.java index f2e7ea534..b5ddc5a89 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionLimiterImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionLimiterImpl.java @@ -1,11 +1,12 @@ package org.briarproject.bramble.plugin.bluetooth; +import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent; import org.briarproject.bramble.api.system.Clock; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -18,6 +19,7 @@ import javax.inject.Inject; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.plugin.BluetoothConstants.ID; import static org.briarproject.bramble.util.LogUtils.logException; @NotNullByDefault @@ -27,6 +29,7 @@ class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter { private static final Logger LOG = getLogger(BluetoothConnectionLimiterImpl.class.getName()); + private final EventBus eventBus; private final Clock clock; private final Object lock = new Object(); @@ -38,24 +41,18 @@ class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter { private int connectionLimit = 1; @Inject - BluetoothConnectionLimiterImpl(Clock clock) { + BluetoothConnectionLimiterImpl(EventBus eventBus, Clock clock) { + this.eventBus = eventBus; this.clock = clock; } @Override public void keyAgreementStarted() { - List close; synchronized (lock) { keyAgreementInProgress = true; - close = new ArrayList<>(connections.size()); - for (ConnectionRecord rec : connections) close.add(rec.connection); - connections.clear(); } - if (LOG.isLoggable(INFO)) { - LOG.info("Key agreement started, closing " + close.size() + - " connections"); - } - for (DuplexTransportConnection conn : close) tryToClose(conn); + LOG.info("Key agreement started"); + eventBus.broadcast(new CloseSyncConnectionsEvent(ID)); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index 0aa1a8985..552d9b347 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -11,6 +11,7 @@ import org.briarproject.bramble.api.event.EventListener; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Offer; @@ -18,6 +19,7 @@ import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.Versions; +import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent; import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent; import org.briarproject.bramble.api.sync.event.MessageRequestedEvent; import org.briarproject.bramble.api.sync.event.MessageSharedEvent; @@ -71,6 +73,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { private final EventBus eventBus; private final Clock clock; private final ContactId contactId; + private final TransportId transportId; private final int maxLatency, maxIdleTime; private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; @@ -86,14 +89,15 @@ class DuplexOutgoingSession implements SyncSession, EventListener { private volatile boolean interrupted = false; DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, - EventBus eventBus, Clock clock, ContactId contactId, int maxLatency, - int maxIdleTime, StreamWriter streamWriter, - SyncRecordWriter recordWriter) { + EventBus eventBus, Clock clock, ContactId contactId, + TransportId transportId, int maxLatency, int maxIdleTime, + StreamWriter streamWriter, SyncRecordWriter recordWriter) { this.db = db; this.dbExecutor = dbExecutor; this.eventBus = eventBus; this.clock = clock; this.contactId = contactId; + this.transportId = transportId; this.maxLatency = maxLatency; this.maxIdleTime = maxIdleTime; this.streamWriter = streamWriter; @@ -223,6 +227,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener { } else if (e instanceof LifecycleEvent) { LifecycleEvent l = (LifecycleEvent) e; if (l.getLifecycleState() == STOPPING) interrupt(); + } else if (e instanceof CloseSyncConnectionsEvent) { + CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e; + if (c.getTransportId().equals(transportId)) interrupt(); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 9e1e62909..2697a2cfc 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -11,11 +11,13 @@ import org.briarproject.bramble.api.event.EventListener; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.Versions; +import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent; import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; @@ -56,6 +58,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private final Executor dbExecutor; private final EventBus eventBus; private final ContactId contactId; + private final TransportId transportId; private final int maxLatency; private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; @@ -65,12 +68,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private volatile boolean interrupted = false; SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, - EventBus eventBus, ContactId contactId, int maxLatency, - StreamWriter streamWriter, SyncRecordWriter recordWriter) { + EventBus eventBus, ContactId contactId, TransportId transportId, + int maxLatency, StreamWriter streamWriter, + SyncRecordWriter recordWriter) { this.db = db; this.dbExecutor = dbExecutor; this.eventBus = eventBus; this.contactId = contactId; + this.transportId = transportId; this.maxLatency = maxLatency; this.streamWriter = streamWriter; this.recordWriter = recordWriter; @@ -123,6 +128,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } else if (e instanceof LifecycleEvent) { LifecycleEvent l = (LifecycleEvent) e; if (l.getLifecycleState() == STOPPING) interrupt(); + } else if (e instanceof CloseSyncConnectionsEvent) { + CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e; + if (c.getTransportId().equals(transportId)) interrupt(); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index d35e1164b..7d17b0362 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -5,6 +5,7 @@ import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.sync.SyncRecordReader; import org.briarproject.bramble.api.sync.SyncRecordReaderFactory; import org.briarproject.bramble.api.sync.SyncRecordWriter; @@ -53,22 +54,23 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { } @Override - public SyncSession createSimplexOutgoingSession(ContactId c, + public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, int maxLatency, StreamWriter streamWriter) { OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); - return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, + return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t, maxLatency, streamWriter, recordWriter); } @Override - public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, - int maxIdleTime, StreamWriter streamWriter) { + public SyncSession createDuplexOutgoingSession(ContactId c, + TransportId t, int maxLatency, int maxIdleTime, + StreamWriter streamWriter) { OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); - return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, + return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t, maxLatency, maxIdleTime, streamWriter, recordWriter); } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionLimiterImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionLimiterImplTest.java index c87573827..e658fbe84 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionLimiterImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionLimiterImplTest.java @@ -1,11 +1,13 @@ package org.briarproject.bramble.plugin.bluetooth; +import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.plugin.TransportConnectionReader; import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.test.BrambleMockTestCase; import org.jmock.Expectations; +import org.junit.Before; import org.junit.Test; import static org.briarproject.bramble.plugin.bluetooth.BluetoothConnectionLimiter.STABILITY_PERIOD_MS; @@ -14,6 +16,7 @@ import static org.junit.Assert.assertTrue; public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase { + private final EventBus eventBus = context.mock(EventBus.class); private final Clock clock = context.mock(Clock.class); private final DuplexTransportConnection conn = context.mock(DuplexTransportConnection.class); @@ -24,11 +27,15 @@ public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase { private final long now = System.currentTimeMillis(); + private BluetoothConnectionLimiter limiter; + + @Before + public void setUp() { + limiter = new BluetoothConnectionLimiterImpl(eventBus, clock); + } + @Test public void testLimiterAllowsOneOutgoingConnection() { - BluetoothConnectionLimiter limiter = - new BluetoothConnectionLimiterImpl(clock); - expectGetCurrentTime(now); assertTrue(limiter.canOpenContactConnection()); @@ -41,9 +48,6 @@ public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase { @Test public void testLimiterAllowsSecondIncomingConnection() throws Exception { - BluetoothConnectionLimiter limiter = - new BluetoothConnectionLimiterImpl(clock); - expectGetCurrentTime(now); assertTrue(limiter.canOpenContactConnection()); @@ -64,11 +68,7 @@ public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase { } @Test - public void testLimiterAllowsSecondOutgoingConnectionWhenFirstIsStable() - throws Exception { - BluetoothConnectionLimiter limiter = - new BluetoothConnectionLimiterImpl(clock); - + public void testLimiterAllowsSecondOutgoingConnectionWhenFirstIsStable() { expectGetCurrentTime(now); assertTrue(limiter.canOpenContactConnection()); @@ -87,9 +87,6 @@ public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase { @Test public void testLimiterAllowsThirdIncomingConnectionWhenFirstTwoAreStable() throws Exception { - BluetoothConnectionLimiter limiter = - new BluetoothConnectionLimiterImpl(clock); - expectGetCurrentTime(now); assertTrue(limiter.canOpenContactConnection()); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index 430a9150f..df8dc7b87 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -4,6 +4,7 @@ import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.Transaction; import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.GroupId; import org.briarproject.bramble.api.sync.Message; @@ -23,6 +24,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getMessage; import static org.briarproject.bramble.test.TestUtils.getRandomId; +import static org.briarproject.bramble.test.TestUtils.getTransportId; public class SimplexOutgoingSessionTest extends BrambleMockTestCase { @@ -36,14 +38,15 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { private final Executor dbExecutor = new ImmediateExecutor(); private final ContactId contactId = getContactId(); + private final TransportId transportId = getTransportId(); private final Message message = getMessage(new GroupId(getRandomId())); private final MessageId messageId = message.getId(); @Test public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter, - recordWriter); + dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + streamWriter, recordWriter); Transaction noAckTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false); @@ -76,8 +79,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { public void testSomethingToSend() throws Exception { Ack ack = new Ack(singletonList(messageId)); SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter, - recordWriter); + dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + streamWriter, recordWriter); Transaction ackTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false);