diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java index c29e5721e..c7c8113f9 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java @@ -1,17 +1,46 @@ package org.briarproject.bramble.api.plugin; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +@NotNullByDefault public interface ConnectionManager { + /** + * Manages an incoming connection from a contact over a simplex transport. + */ void manageIncomingConnection(TransportId t, TransportConnectionReader r); + /** + * Manages an incoming connection from a contact over a duplex transport. + */ void manageIncomingConnection(TransportId t, DuplexTransportConnection d); + /** + * Manages an incoming handshake connection from a pending contact over a + * duplex transport. + */ + void manageIncomingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d); + + /** + * Manages an outgoing connection to a contact over a simplex transport. + */ void manageOutgoingConnection(ContactId c, TransportId t, TransportConnectionWriter w); + /** + * Manages an outgoing connection to a contact over a duplex transport. + */ void manageOutgoingConnection(ContactId c, TransportId t, DuplexTransportConnection d); + + /** + * Manages an outgoing handshake connection to a pending contact over a + * duplex transport. + */ + void manageOutgoingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index 7cece262b..d3fec490b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -1,6 +1,11 @@ package org.briarproject.bramble.plugin; +import org.briarproject.bramble.api.contact.Contact; +import org.briarproject.bramble.api.contact.ContactExchangeManager; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.HandshakeManager; +import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult; +import org.briarproject.bramble.api.contact.PendingContactId; import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; @@ -44,6 +49,8 @@ class ConnectionManagerImpl implements ConnectionManager { private final StreamReaderFactory streamReaderFactory; private final StreamWriterFactory streamWriterFactory; private final SyncSessionFactory syncSessionFactory; + private final HandshakeManager handshakeManager; + private final ContactExchangeManager contactExchangeManager; private final ConnectionRegistry connectionRegistry; @Inject @@ -51,12 +58,16 @@ class ConnectionManagerImpl implements ConnectionManager { KeyManager keyManager, StreamReaderFactory streamReaderFactory, StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, + HandshakeManager handshakeManager, + ContactExchangeManager contactExchangeManager, ConnectionRegistry connectionRegistry) { this.ioExecutor = ioExecutor; this.keyManager = keyManager; this.streamReaderFactory = streamReaderFactory; this.streamWriterFactory = streamWriterFactory; this.syncSessionFactory = syncSessionFactory; + this.handshakeManager = handshakeManager; + this.contactExchangeManager = contactExchangeManager; this.connectionRegistry = connectionRegistry; } @@ -72,6 +83,12 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new ManageIncomingDuplexConnection(t, d)); } + @Override + public void manageIncomingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d) { + ioExecutor.execute(new ManageIncomingHandshakeConnection(p, t, d)); + } + @Override public void manageOutgoingConnection(ContactId c, TransportId t, TransportConnectionWriter w) { @@ -84,6 +101,12 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new ManageOutgoingDuplexConnection(c, t, d)); } + @Override + public void manageOutgoingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d) { + ioExecutor.execute(new ManageOutgoingHandshakeConnection(p, t, d)); + } + private byte[] readTag(InputStream in) throws IOException { byte[] tag = new byte[TAG_LENGTH]; read(in, tag); @@ -467,4 +490,205 @@ class ConnectionManagerImpl implements ConnectionManager { disposeOnError(writer); } } + + private class ManageIncomingHandshakeConnection implements Runnable { + + private final PendingContactId pendingContactId; + private final TransportId transportId; + private final DuplexTransportConnection connection; + private final TransportConnectionReader reader; + private final TransportConnectionWriter writer; + + private ManageIncomingHandshakeConnection( + PendingContactId pendingContactId, TransportId transportId, + DuplexTransportConnection connection) { + this.pendingContactId = pendingContactId; + this.transportId = transportId; + this.connection = connection; + reader = connection.getReader(); + writer = connection.getWriter(); + } + + @Override + public void run() { + // Read and recognise the tag + StreamContext ctxIn; + try { + byte[] tag = readTag(reader.getInputStream()); + ctxIn = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(false); + return; + } + if (ctxIn == null) { + LOG.info("Unrecognised tag"); + onError(false); + return; + } + PendingContactId inPendingContactId = ctxIn.getPendingContactId(); + if (inPendingContactId == null) { + LOG.warning("Expected rendezvous tag, got contact tag"); + onError(true); + return; + } + // Allocate the outgoing stream context + StreamContext ctxOut; + try { + ctxOut = keyManager.getStreamContext(pendingContactId, + transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onError(true); + return; + } + if (ctxOut == null) { + LOG.warning("Could not allocate stream context"); + onError(true); + return; + } + // Close the connection if it's redundant + if (!connectionRegistry.registerConnection(pendingContactId)) { + LOG.info("Redundant rendezvous connection"); + onError(true); + return; + } + // Handshake and exchange contacts + try { + InputStream in = streamReaderFactory.createStreamReader( + reader.getInputStream(), ctxIn); + // Flush the output stream to send the outgoing stream header + StreamWriter out = streamWriterFactory.createStreamWriter( + writer.getOutputStream(), ctxOut); + out.getOutputStream().flush(); + HandshakeResult result = handshakeManager.handshake( + pendingContactId, in, out); + Contact contact = contactExchangeManager.exchangeContacts( + pendingContactId, connection, result.getMasterKey(), + result.isAlice(), false); + connectionRegistry.unregisterConnection(pendingContactId, true); + // Reuse the connection as a transport connection + manageOutgoingConnection(contact.getId(), transportId, + connection); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(true); + connectionRegistry.unregisterConnection(pendingContactId, + false); + } + } + + private void onError(boolean recognised) { + disposeOnError(reader, recognised); + disposeOnError(writer); + } + } + + private class ManageOutgoingHandshakeConnection implements Runnable { + + private final PendingContactId pendingContactId; + private final TransportId transportId; + private final DuplexTransportConnection connection; + private final TransportConnectionReader reader; + private final TransportConnectionWriter writer; + + private ManageOutgoingHandshakeConnection( + PendingContactId pendingContactId, TransportId transportId, + DuplexTransportConnection connection) { + this.pendingContactId = pendingContactId; + this.transportId = transportId; + this.connection = connection; + reader = connection.getReader(); + writer = connection.getWriter(); + } + + @Override + public void run() { + // Allocate the outgoing stream context + StreamContext ctxOut; + try { + ctxOut = keyManager.getStreamContext(pendingContactId, + transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + if (ctxOut == null) { + LOG.warning("Could not allocate stream context"); + onError(); + return; + } + // Flush the output stream to send the outgoing stream header + StreamWriter out; + try { + out = streamWriterFactory.createStreamWriter( + writer.getOutputStream(), ctxOut); + out.getOutputStream().flush(); + } catch (IOException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + // Read and recognise the tag + StreamContext ctxIn; + try { + byte[] tag = readTag(reader.getInputStream()); + ctxIn = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + // Unrecognised tags are suspicious in this case + if (ctxIn == null) { + LOG.warning("Unrecognised tag for returning stream"); + onError(); + return; + } + // Check that the stream comes from the expected pending contact + PendingContactId inPendingContactId = ctxIn.getPendingContactId(); + if (inPendingContactId == null) { + LOG.warning("Expected rendezvous tag, got contact tag"); + onError(); + return; + } + if (!inPendingContactId.equals(pendingContactId)) { + LOG.warning("Wrong pending contact ID for returning stream"); + onError(); + return; + } + // Close the connection if it's redundant + if (!connectionRegistry.registerConnection(pendingContactId)) { + LOG.info("Redundant rendezvous connection"); + onError(); + return; + } + // Handshake and exchange contacts + try { + InputStream in = streamReaderFactory.createStreamReader( + reader.getInputStream(), ctxIn); + HandshakeResult result = handshakeManager.handshake( + pendingContactId, in, out); + Contact contact = contactExchangeManager.exchangeContacts( + pendingContactId, connection, result.getMasterKey(), + result.isAlice(), false); + connectionRegistry.unregisterConnection(pendingContactId, true); + // Reuse the connection as a transport connection + manageOutgoingConnection(contact.getId(), transportId, + connection); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(); + connectionRegistry.unregisterConnection(pendingContactId, + false); + } + } + + private void onError() { + // 'Recognised' is always true for outgoing connections + disposeOnError(reader, true); + disposeOnError(writer); + } + } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java index 8b461c669..0409571f9 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTest.java @@ -3,9 +3,9 @@ package org.briarproject.bramble.contact; import org.briarproject.bramble.api.Pair; import org.briarproject.bramble.api.contact.Contact; import org.briarproject.bramble.api.contact.ContactManager; -import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult; import org.briarproject.bramble.api.contact.PendingContact; import org.briarproject.bramble.api.contact.PendingContactState; +import org.briarproject.bramble.api.contact.event.ContactAddedEvent; import org.briarproject.bramble.api.crypto.PublicKey; import org.briarproject.bramble.api.crypto.SecretKey; import org.briarproject.bramble.api.identity.Identity; @@ -14,19 +14,14 @@ import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.test.BrambleTestCase; import org.briarproject.bramble.test.TestDatabaseConfigModule; import org.briarproject.bramble.test.TestDuplexTransportConnection; -import org.briarproject.bramble.test.TestStreamWriter; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.File; -import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.util.Collection; import java.util.Random; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static junit.framework.TestCase.assertNotNull; @@ -34,12 +29,12 @@ import static junit.framework.TestCase.assertNull; import static junit.framework.TestCase.fail; import static org.briarproject.bramble.api.contact.PendingContactState.WAITING_FOR_CONNECTION; import static org.briarproject.bramble.test.TestDuplexTransportConnection.createPair; +import static org.briarproject.bramble.test.TestPluginConfigModule.DUPLEX_TRANSPORT_ID; import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory; import static org.briarproject.bramble.test.TestUtils.getSecretKey; import static org.briarproject.bramble.test.TestUtils.getTestDirectory; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; public class ContactExchangeIntegrationTest extends BrambleTestCase { @@ -116,8 +111,8 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase { fail(); } }); - aliceFinished.await(TIMEOUT, MILLISECONDS); - bobFinished.await(TIMEOUT, MILLISECONDS); + assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS)); + assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS)); assertContacts(verified, false); assertNoPendingContacts(); } @@ -155,8 +150,8 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase { fail(); } }); - aliceFinished.await(TIMEOUT, MILLISECONDS); - bobFinished.await(TIMEOUT, MILLISECONDS); + assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS)); + assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS)); assertContacts(verified, true); assertNoPendingContacts(); } @@ -168,54 +163,25 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase { PendingContact aliceFromBob = addPendingContact(bob, alice); assertPendingContacts(); - PipedInputStream aliceHandshakeIn = new PipedInputStream(); - PipedInputStream bobHandshakeIn = new PipedInputStream(); - OutputStream aliceHandshakeOut = new PipedOutputStream(bobHandshakeIn); - OutputStream bobHandshakeOut = new PipedOutputStream(aliceHandshakeIn); - AtomicReference aliceResult = new AtomicReference<>(); - AtomicReference bobResult = new AtomicReference<>(); - TestDuplexTransportConnection[] pair = createPair(); TestDuplexTransportConnection aliceConnection = pair[0]; TestDuplexTransportConnection bobConnection = pair[1]; CountDownLatch aliceFinished = new CountDownLatch(1); CountDownLatch bobFinished = new CountDownLatch(1); - boolean verified = random.nextBoolean(); - alice.getIoExecutor().execute(() -> { - try { - HandshakeResult result = alice.getHandshakeManager().handshake( - bobFromAlice.getId(), aliceHandshakeIn, - new TestStreamWriter(aliceHandshakeOut)); - aliceResult.set(result); - alice.getContactExchangeManager().exchangeContacts( - bobFromAlice.getId(), aliceConnection, - result.getMasterKey(), result.isAlice(), verified); - aliceFinished.countDown(); - } catch (Exception e) { - fail(); - } + alice.getEventBus().addListener(e -> { + if (e instanceof ContactAddedEvent) aliceFinished.countDown(); }); - bob.getIoExecutor().execute(() -> { - try { - HandshakeResult result = bob.getHandshakeManager().handshake( - aliceFromBob.getId(), bobHandshakeIn, - new TestStreamWriter(bobHandshakeOut)); - bobResult.set(result); - bob.getContactExchangeManager().exchangeContacts( - aliceFromBob.getId(), bobConnection, - result.getMasterKey(), result.isAlice(), verified); - bobFinished.countDown(); - } catch (Exception e) { - fail(); - } + alice.getConnectionManager().manageOutgoingConnection( + bobFromAlice.getId(), DUPLEX_TRANSPORT_ID, aliceConnection); + bob.getEventBus().addListener(e -> { + if (e instanceof ContactAddedEvent) bobFinished.countDown(); }); - aliceFinished.await(TIMEOUT, MILLISECONDS); - bobFinished.await(TIMEOUT, MILLISECONDS); - assertArrayEquals(aliceResult.get().getMasterKey().getBytes(), - bobResult.get().getMasterKey().getBytes()); - assertNotEquals(aliceResult.get().isAlice(), bobResult.get().isAlice()); - assertContacts(verified, true); + bob.getConnectionManager().manageIncomingConnection( + aliceFromBob.getId(), DUPLEX_TRANSPORT_ID, bobConnection); + assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS)); + assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS)); + assertContacts(false, true); assertNoPendingContacts(); } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTestComponent.java b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTestComponent.java index d3d7dae2d..dd47cfbbd 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTestComponent.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/contact/ContactExchangeIntegrationTestComponent.java @@ -4,10 +4,11 @@ import org.briarproject.bramble.BrambleCoreEagerSingletons; import org.briarproject.bramble.BrambleCoreModule; import org.briarproject.bramble.api.contact.ContactExchangeManager; import org.briarproject.bramble.api.contact.ContactManager; -import org.briarproject.bramble.api.contact.HandshakeManager; +import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.identity.IdentityManager; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.LifecycleManager; +import org.briarproject.bramble.api.plugin.ConnectionManager; import org.briarproject.bramble.test.BrambleCoreIntegrationTestModule; import java.util.concurrent.Executor; @@ -24,11 +25,13 @@ import dagger.Component; interface ContactExchangeIntegrationTestComponent extends BrambleCoreEagerSingletons { + ConnectionManager getConnectionManager(); + ContactExchangeManager getContactExchangeManager(); ContactManager getContactManager(); - HandshakeManager getHandshakeManager(); + EventBus getEventBus(); IdentityManager getIdentityManager(); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java b/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java index 6457f8191..007ec5f9d 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java @@ -21,6 +21,7 @@ public class TestDuplexTransportConnection private final TransportConnectionReader reader; private final TransportConnectionWriter writer; + @SuppressWarnings("WeakerAccess") public TestDuplexTransportConnection(InputStream in, OutputStream out) { reader = new TestTransportConnectionReader(in); writer = new TestTransportConnectionWriter(out); @@ -42,8 +43,9 @@ public class TestDuplexTransportConnection */ public static TestDuplexTransportConnection[] createPair() throws IOException { - PipedInputStream aliceIn = new PipedInputStream(); - PipedInputStream bobIn = new PipedInputStream(); + // Use 64k buffers to prevent deadlock + PipedInputStream aliceIn = new PipedInputStream(1 << 16); + PipedInputStream bobIn = new PipedInputStream(1 << 16); PipedOutputStream aliceOut = new PipedOutputStream(bobIn); PipedOutputStream bobOut = new PipedOutputStream(aliceIn); TestDuplexTransportConnection alice =