From 8f21e07840416b6b2ff838420a44ecbce326d5fd Mon Sep 17 00:00:00 2001 From: akwizgran Date: Mon, 27 May 2019 15:19:38 +0100 Subject: [PATCH 1/5] Add rendezvous connection support to connection manager. --- .../bramble/api/plugin/ConnectionManager.java | 29 +++ .../bramble/plugin/ConnectionManagerImpl.java | 231 ++++++++++++++++++ 2 files changed, 260 insertions(+) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java index c29e5721e..c7c8113f9 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java @@ -1,17 +1,46 @@ package org.briarproject.bramble.api.plugin; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +@NotNullByDefault public interface ConnectionManager { + /** + * Manages an incoming connection from a contact over a simplex transport. + */ void manageIncomingConnection(TransportId t, TransportConnectionReader r); + /** + * Manages an incoming connection from a contact over a duplex transport. + */ void manageIncomingConnection(TransportId t, DuplexTransportConnection d); + /** + * Manages an incoming handshake connection from a pending contact over a + * duplex transport. + */ + void manageIncomingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d); + + /** + * Manages an outgoing connection to a contact over a simplex transport. + */ void manageOutgoingConnection(ContactId c, TransportId t, TransportConnectionWriter w); + /** + * Manages an outgoing connection to a contact over a duplex transport. + */ void manageOutgoingConnection(ContactId c, TransportId t, DuplexTransportConnection d); + + /** + * Manages an outgoing handshake connection to a pending contact over a + * duplex transport. + */ + void manageOutgoingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index 7cece262b..8fefc4697 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -1,6 +1,11 @@ package org.briarproject.bramble.plugin; +import org.briarproject.bramble.api.contact.Contact; +import org.briarproject.bramble.api.contact.ContactExchangeManager; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.HandshakeManager; +import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult; +import org.briarproject.bramble.api.contact.PendingContactId; import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; @@ -44,6 +49,8 @@ class ConnectionManagerImpl implements ConnectionManager { private final StreamReaderFactory streamReaderFactory; private final StreamWriterFactory streamWriterFactory; private final SyncSessionFactory syncSessionFactory; + private final HandshakeManager handshakeManager; + private final ContactExchangeManager contactExchangeManager; private final ConnectionRegistry connectionRegistry; @Inject @@ -51,12 +58,16 @@ class ConnectionManagerImpl implements ConnectionManager { KeyManager keyManager, StreamReaderFactory streamReaderFactory, StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, + HandshakeManager handshakeManager, + ContactExchangeManager contactExchangeManager, ConnectionRegistry connectionRegistry) { this.ioExecutor = ioExecutor; this.keyManager = keyManager; this.streamReaderFactory = streamReaderFactory; this.streamWriterFactory = streamWriterFactory; this.syncSessionFactory = syncSessionFactory; + this.handshakeManager = handshakeManager; + this.contactExchangeManager = contactExchangeManager; this.connectionRegistry = connectionRegistry; } @@ -72,6 +83,12 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new ManageIncomingDuplexConnection(t, d)); } + @Override + public void manageIncomingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d) { + ioExecutor.execute(new ManageIncomingHandshakeConnection(p, t, d)); + } + @Override public void manageOutgoingConnection(ContactId c, TransportId t, TransportConnectionWriter w) { @@ -84,6 +101,12 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new ManageOutgoingDuplexConnection(c, t, d)); } + @Override + public void manageOutgoingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d) { + ioExecutor.execute(new ManageOutgoingHandshakeConnection(p, t, d)); + } + private byte[] readTag(InputStream in) throws IOException { byte[] tag = new byte[TAG_LENGTH]; read(in, tag); @@ -467,4 +490,212 @@ class ConnectionManagerImpl implements ConnectionManager { disposeOnError(writer); } } + + private class ManageIncomingHandshakeConnection implements Runnable { + + private final PendingContactId pendingContactId; + private final TransportId transportId; + private final DuplexTransportConnection connection; + private final TransportConnectionReader reader; + private final TransportConnectionWriter writer; + + private ManageIncomingHandshakeConnection( + PendingContactId pendingContactId, TransportId transportId, + DuplexTransportConnection connection) { + this.pendingContactId = pendingContactId; + this.transportId = transportId; + this.connection = connection; + reader = connection.getReader(); + writer = connection.getWriter(); + } + + @Override + public void run() { + // Read and recognise the tag + StreamContext ctxIn; + try { + byte[] tag = readTag(reader.getInputStream()); + ctxIn = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(false); + return; + } + if (ctxIn == null) { + LOG.info("Unrecognised tag"); + onError(false); + return; + } + PendingContactId inPendingContactId = ctxIn.getPendingContactId(); + if (inPendingContactId == null) { + LOG.warning("Expected rendezvous tag, got contact tag"); + onError(true); + return; + } + // Allocate the outgoing stream context + StreamContext ctxOut; + try { + ctxOut = keyManager.getStreamContext(pendingContactId, + transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onError(true); + return; + } + if (ctxOut == null) { + LOG.warning("Could not allocate stream context"); + onError(true); + return; + } + // Close the connection if it's redundant + if (!connectionRegistry.registerConnection(pendingContactId)) { + LOG.info("Redundant rendezvous connection"); + onError(true); + return; + } + // Flush the output stream to send the outgoing stream header + StreamWriter out; + try { + out = streamWriterFactory.createStreamWriter( + writer.getOutputStream(), ctxOut); + out.getOutputStream().flush(); + } catch (IOException e) { + logException(LOG, WARNING, e); + onError(true); + return; + } + // Handshake and exchange contacts + try { + InputStream in = streamReaderFactory.createStreamReader( + reader.getInputStream(), ctxIn); + HandshakeResult result = handshakeManager.handshake( + pendingContactId, in, out); + Contact contact = contactExchangeManager.exchangeContacts( + pendingContactId, connection, result.getMasterKey(), + result.isAlice(), false); + connectionRegistry.unregisterConnection(pendingContactId, true); + // Reuse the connection as a transport connection + manageOutgoingConnection(contact.getId(), transportId, + connection); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(true); + connectionRegistry.unregisterConnection(pendingContactId, + false); + } + } + + private void onError(boolean recognised) { + disposeOnError(reader, recognised); + disposeOnError(writer); + } + } + + private class ManageOutgoingHandshakeConnection implements Runnable { + + private final PendingContactId pendingContactId; + private final TransportId transportId; + private final DuplexTransportConnection connection; + private final TransportConnectionReader reader; + private final TransportConnectionWriter writer; + + private ManageOutgoingHandshakeConnection( + PendingContactId pendingContactId, TransportId transportId, + DuplexTransportConnection connection) { + this.pendingContactId = pendingContactId; + this.transportId = transportId; + this.connection = connection; + reader = connection.getReader(); + writer = connection.getWriter(); + } + + @Override + public void run() { + // Allocate the outgoing stream context + StreamContext ctxOut; + try { + ctxOut = keyManager.getStreamContext(pendingContactId, + transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + if (ctxOut == null) { + LOG.warning("Could not allocate stream context"); + onError(); + return; + } + // Flush the output stream to send the outgoing stream header + StreamWriter out; + try { + out = streamWriterFactory.createStreamWriter( + writer.getOutputStream(), ctxOut); + out.getOutputStream().flush(); + } catch (IOException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + // Read and recognise the tag + StreamContext ctxIn; + try { + byte[] tag = readTag(reader.getInputStream()); + ctxIn = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + // Unrecognised tags are suspicious in this case + if (ctxIn == null) { + LOG.warning("Unrecognised tag for returning stream"); + onError(); + return; + } + // Check that the stream comes from the expected pending contact + PendingContactId inPendingContactId = ctxIn.getPendingContactId(); + if (inPendingContactId == null) { + LOG.warning("Expected rendezvous tag, got contact tag"); + onError(); + return; + } + if (!inPendingContactId.equals(pendingContactId)) { + LOG.warning("Wrong pending contact ID for returning stream"); + onError(); + return; + } + // Close the connection if it's redundant + if (!connectionRegistry.registerConnection(pendingContactId)) { + LOG.info("Redundant rendezvous connection"); + onError(); + return; + } + // Create and run the handshake session + try { + InputStream in = streamReaderFactory.createStreamReader( + reader.getInputStream(), ctxIn); + HandshakeResult result = handshakeManager.handshake( + pendingContactId, in, out); + Contact contact = contactExchangeManager.exchangeContacts( + pendingContactId, connection, result.getMasterKey(), + result.isAlice(), false); + connectionRegistry.unregisterConnection(pendingContactId, true); + // Reuse the connection as a transport connection + manageOutgoingConnection(contact.getId(), transportId, + connection); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(); + connectionRegistry.unregisterConnection(pendingContactId, + false); + } + } + + private void onError() { + // 'Recognised' is always true for outgoing connections + disposeOnError(reader, true); + disposeOnError(writer); + } + } } From f95bb9b28e8315b4bad3907039bdee491f3cf002 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Mon, 3 Jun 2019 15:01:31 +0100 Subject: [PATCH 2/5] Add integration test for new connection manager methods. --- .../ContactExchangeIntegrationTest.java | 72 ++++++------------- ...ntactExchangeIntegrationTestComponent.java | 7 +- 2 files changed, 26 insertions(+), 53 deletions(-) diff --git a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java index 8b461c669..6d1bd9d1a 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java @@ -3,9 +3,9 @@ package org.briarproject.bramble.contact; import org.briarproject.bramble.api.Pair; import org.briarproject.bramble.api.contact.Contact; import org.briarproject.bramble.api.contact.ContactManager; -import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult; import org.briarproject.bramble.api.contact.PendingContact; import org.briarproject.bramble.api.contact.PendingContactState; +import org.briarproject.bramble.api.contact.event.ContactAddedEvent; import org.briarproject.bramble.api.crypto.PublicKey; import org.briarproject.bramble.api.crypto.SecretKey; import org.briarproject.bramble.api.identity.Identity; @@ -14,19 +14,14 @@ import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.test.BrambleTestCase; import org.briarproject.bramble.test.TestDatabaseConfigModule; import org.briarproject.bramble.test.TestDuplexTransportConnection; -import org.briarproject.bramble.test.TestStreamWriter; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.File; -import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.util.Collection; import java.util.Random; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static junit.framework.TestCase.assertNotNull; @@ -34,12 +29,12 @@ import static junit.framework.TestCase.assertNull; import static junit.framework.TestCase.fail; import static org.briarproject.bramble.api.contact.PendingContactState.WAITING_FOR_CONNECTION; import static org.briarproject.bramble.test.TestDuplexTransportConnection.createPair; +import static org.briarproject.bramble.test.TestPluginConfigModule.DUPLEX_TRANSPORT_ID; import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory; import static org.briarproject.bramble.test.TestUtils.getSecretKey; import static org.briarproject.bramble.test.TestUtils.getTestDirectory; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; public class ContactExchangeIntegrationTest extends BrambleTestCase { @@ -116,8 +111,8 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase { fail(); } }); - aliceFinished.await(TIMEOUT, MILLISECONDS); - bobFinished.await(TIMEOUT, MILLISECONDS); + assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS)); + assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS)); assertContacts(verified, false); assertNoPendingContacts(); } @@ -155,8 +150,8 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase { fail(); } }); - aliceFinished.await(TIMEOUT, MILLISECONDS); - bobFinished.await(TIMEOUT, MILLISECONDS); + assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS)); + assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS)); assertContacts(verified, true); assertNoPendingContacts(); } @@ -168,54 +163,29 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase { PendingContact aliceFromBob = addPendingContact(bob, alice); assertPendingContacts(); - PipedInputStream aliceHandshakeIn = new PipedInputStream(); - PipedInputStream bobHandshakeIn = new PipedInputStream(); - OutputStream aliceHandshakeOut = new PipedOutputStream(bobHandshakeIn); - OutputStream bobHandshakeOut = new PipedOutputStream(aliceHandshakeIn); - AtomicReference aliceResult = new AtomicReference<>(); - AtomicReference bobResult = new AtomicReference<>(); - TestDuplexTransportConnection[] pair = createPair(); TestDuplexTransportConnection aliceConnection = pair[0]; TestDuplexTransportConnection bobConnection = pair[1]; CountDownLatch aliceFinished = new CountDownLatch(1); CountDownLatch bobFinished = new CountDownLatch(1); - boolean verified = random.nextBoolean(); - alice.getIoExecutor().execute(() -> { - try { - HandshakeResult result = alice.getHandshakeManager().handshake( - bobFromAlice.getId(), aliceHandshakeIn, - new TestStreamWriter(aliceHandshakeOut)); - aliceResult.set(result); - alice.getContactExchangeManager().exchangeContacts( - bobFromAlice.getId(), aliceConnection, - result.getMasterKey(), result.isAlice(), verified); - aliceFinished.countDown(); - } catch (Exception e) { - fail(); - } + alice.getEventBus().addListener(e -> { + if (e instanceof ContactAddedEvent) aliceFinished.countDown(); }); - bob.getIoExecutor().execute(() -> { - try { - HandshakeResult result = bob.getHandshakeManager().handshake( - aliceFromBob.getId(), bobHandshakeIn, - new TestStreamWriter(bobHandshakeOut)); - bobResult.set(result); - bob.getContactExchangeManager().exchangeContacts( - aliceFromBob.getId(), bobConnection, - result.getMasterKey(), result.isAlice(), verified); - bobFinished.countDown(); - } catch (Exception e) { - fail(); - } + alice.getIoExecutor().execute(() -> + alice.getConnectionManager().manageOutgoingConnection( + bobFromAlice.getId(), DUPLEX_TRANSPORT_ID, + aliceConnection)); + bob.getEventBus().addListener(e -> { + if (e instanceof ContactAddedEvent) bobFinished.countDown(); }); - aliceFinished.await(TIMEOUT, MILLISECONDS); - bobFinished.await(TIMEOUT, MILLISECONDS); - assertArrayEquals(aliceResult.get().getMasterKey().getBytes(), - bobResult.get().getMasterKey().getBytes()); - assertNotEquals(aliceResult.get().isAlice(), bobResult.get().isAlice()); - assertContacts(verified, true); + bob.getIoExecutor().execute(() -> + bob.getConnectionManager().manageIncomingConnection( + aliceFromBob.getId(), DUPLEX_TRANSPORT_ID, + bobConnection)); + assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS)); + assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS)); + assertContacts(false, true); assertNoPendingContacts(); } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTestComponent.java b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTestComponent.java index d3d7dae2d..dd47cfbbd 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTestComponent.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTestComponent.java @@ -4,10 +4,11 @@ import org.briarproject.bramble.BrambleCoreEagerSingletons; import org.briarproject.bramble.BrambleCoreModule; import org.briarproject.bramble.api.contact.ContactExchangeManager; import org.briarproject.bramble.api.contact.ContactManager; -import org.briarproject.bramble.api.contact.HandshakeManager; +import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.identity.IdentityManager; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.LifecycleManager; +import org.briarproject.bramble.api.plugin.ConnectionManager; import org.briarproject.bramble.test.BrambleCoreIntegrationTestModule; import java.util.concurrent.Executor; @@ -24,11 +25,13 @@ import dagger.Component; interface ContactExchangeIntegrationTestComponent extends BrambleCoreEagerSingletons { + ConnectionManager getConnectionManager(); + ContactExchangeManager getContactExchangeManager(); ContactManager getContactManager(); - HandshakeManager getHandshakeManager(); + EventBus getEventBus(); IdentityManager getIdentityManager(); From 0b764a01dd37ef60b59de2a81d12f59c8fa0b68b Mon Sep 17 00:00:00 2001 From: akwizgran Date: Mon, 3 Jun 2019 17:07:24 +0100 Subject: [PATCH 3/5] Use larger buffer in test connections to prevent deadlock. --- .../bramble/test/TestDuplexTransportConnection.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java b/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java index 6457f8191..007ec5f9d 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java @@ -21,6 +21,7 @@ public class TestDuplexTransportConnection private final TransportConnectionReader reader; private final TransportConnectionWriter writer; + @SuppressWarnings("WeakerAccess") public TestDuplexTransportConnection(InputStream in, OutputStream out) { reader = new TestTransportConnectionReader(in); writer = new TestTransportConnectionWriter(out); @@ -42,8 +43,9 @@ public class TestDuplexTransportConnection */ public static TestDuplexTransportConnection[] createPair() throws IOException { - PipedInputStream aliceIn = new PipedInputStream(); - PipedInputStream bobIn = new PipedInputStream(); + // Use 64k buffers to prevent deadlock + PipedInputStream aliceIn = new PipedInputStream(1 << 16); + PipedInputStream bobIn = new PipedInputStream(1 << 16); PipedOutputStream aliceOut = new PipedOutputStream(bobIn); PipedOutputStream bobOut = new PipedOutputStream(aliceIn); TestDuplexTransportConnection alice = From 9ffd1ec2c239b35ee6e5af54d73e150620b17adf Mon Sep 17 00:00:00 2001 From: akwizgran Date: Tue, 4 Jun 2019 14:20:57 +0100 Subject: [PATCH 4/5] Unregister connection if sending stream header fails. --- .../bramble/plugin/ConnectionManagerImpl.java | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index 8fefc4697..d3fec490b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -553,21 +553,14 @@ class ConnectionManagerImpl implements ConnectionManager { onError(true); return; } - // Flush the output stream to send the outgoing stream header - StreamWriter out; - try { - out = streamWriterFactory.createStreamWriter( - writer.getOutputStream(), ctxOut); - out.getOutputStream().flush(); - } catch (IOException e) { - logException(LOG, WARNING, e); - onError(true); - return; - } // Handshake and exchange contacts try { InputStream in = streamReaderFactory.createStreamReader( reader.getInputStream(), ctxIn); + // Flush the output stream to send the outgoing stream header + StreamWriter out = streamWriterFactory.createStreamWriter( + writer.getOutputStream(), ctxOut); + out.getOutputStream().flush(); HandshakeResult result = handshakeManager.handshake( pendingContactId, in, out); Contact contact = contactExchangeManager.exchangeContacts( @@ -671,7 +664,7 @@ class ConnectionManagerImpl implements ConnectionManager { onError(); return; } - // Create and run the handshake session + // Handshake and exchange contacts try { InputStream in = streamReaderFactory.createStreamReader( reader.getInputStream(), ctxIn); From c536782e0167b0b99d589e1161938b6db9d87a47 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Tue, 4 Jun 2019 14:23:47 +0100 Subject: [PATCH 5/5] Remove redundant use of IO executor. --- .../contact/ContactExchangeIntegrationTest.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java index 6d1bd9d1a..0409571f9 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java @@ -172,17 +172,13 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase { alice.getEventBus().addListener(e -> { if (e instanceof ContactAddedEvent) aliceFinished.countDown(); }); - alice.getIoExecutor().execute(() -> - alice.getConnectionManager().manageOutgoingConnection( - bobFromAlice.getId(), DUPLEX_TRANSPORT_ID, - aliceConnection)); + alice.getConnectionManager().manageOutgoingConnection( + bobFromAlice.getId(), DUPLEX_TRANSPORT_ID, aliceConnection); bob.getEventBus().addListener(e -> { if (e instanceof ContactAddedEvent) bobFinished.countDown(); }); - bob.getIoExecutor().execute(() -> - bob.getConnectionManager().manageIncomingConnection( - aliceFromBob.getId(), DUPLEX_TRANSPORT_ID, - bobConnection)); + bob.getConnectionManager().manageIncomingConnection( + aliceFromBob.getId(), DUPLEX_TRANSPORT_ID, bobConnection); assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS)); assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS)); assertContacts(false, true);