diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java index c29e5721e..c7c8113f9 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/ConnectionManager.java @@ -1,17 +1,46 @@ package org.briarproject.bramble.api.plugin; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +@NotNullByDefault public interface ConnectionManager { + /** + * Manages an incoming connection from a contact over a simplex transport. + */ void manageIncomingConnection(TransportId t, TransportConnectionReader r); + /** + * Manages an incoming connection from a contact over a duplex transport. + */ void manageIncomingConnection(TransportId t, DuplexTransportConnection d); + /** + * Manages an incoming handshake connection from a pending contact over a + * duplex transport. + */ + void manageIncomingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d); + + /** + * Manages an outgoing connection to a contact over a simplex transport. + */ void manageOutgoingConnection(ContactId c, TransportId t, TransportConnectionWriter w); + /** + * Manages an outgoing connection to a contact over a duplex transport. + */ void manageOutgoingConnection(ContactId c, TransportId t, DuplexTransportConnection d); + + /** + * Manages an outgoing handshake connection to a pending contact over a + * duplex transport. + */ + void manageOutgoingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index 7cece262b..8fefc4697 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -1,6 +1,11 @@ package org.briarproject.bramble.plugin; +import org.briarproject.bramble.api.contact.Contact; +import org.briarproject.bramble.api.contact.ContactExchangeManager; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.HandshakeManager; +import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult; +import org.briarproject.bramble.api.contact.PendingContactId; import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; @@ -44,6 +49,8 @@ class ConnectionManagerImpl implements ConnectionManager { private final StreamReaderFactory streamReaderFactory; private final StreamWriterFactory streamWriterFactory; private final SyncSessionFactory syncSessionFactory; + private final HandshakeManager handshakeManager; + private final ContactExchangeManager contactExchangeManager; private final ConnectionRegistry connectionRegistry; @Inject @@ -51,12 +58,16 @@ class ConnectionManagerImpl implements ConnectionManager { KeyManager keyManager, StreamReaderFactory streamReaderFactory, StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, + HandshakeManager handshakeManager, + ContactExchangeManager contactExchangeManager, ConnectionRegistry connectionRegistry) { this.ioExecutor = ioExecutor; this.keyManager = keyManager; this.streamReaderFactory = streamReaderFactory; this.streamWriterFactory = streamWriterFactory; this.syncSessionFactory = syncSessionFactory; + this.handshakeManager = handshakeManager; + this.contactExchangeManager = contactExchangeManager; this.connectionRegistry = connectionRegistry; } @@ -72,6 +83,12 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new ManageIncomingDuplexConnection(t, d)); } + @Override + public void manageIncomingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d) { + ioExecutor.execute(new ManageIncomingHandshakeConnection(p, t, d)); + } + @Override public void manageOutgoingConnection(ContactId c, TransportId t, TransportConnectionWriter w) { @@ -84,6 +101,12 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new ManageOutgoingDuplexConnection(c, t, d)); } + @Override + public void manageOutgoingConnection(PendingContactId p, TransportId t, + DuplexTransportConnection d) { + ioExecutor.execute(new ManageOutgoingHandshakeConnection(p, t, d)); + } + private byte[] readTag(InputStream in) throws IOException { byte[] tag = new byte[TAG_LENGTH]; read(in, tag); @@ -467,4 +490,212 @@ class ConnectionManagerImpl implements ConnectionManager { disposeOnError(writer); } } + + private class ManageIncomingHandshakeConnection implements Runnable { + + private final PendingContactId pendingContactId; + private final TransportId transportId; + private final DuplexTransportConnection connection; + private final TransportConnectionReader reader; + private final TransportConnectionWriter writer; + + private ManageIncomingHandshakeConnection( + PendingContactId pendingContactId, TransportId transportId, + DuplexTransportConnection connection) { + this.pendingContactId = pendingContactId; + this.transportId = transportId; + this.connection = connection; + reader = connection.getReader(); + writer = connection.getWriter(); + } + + @Override + public void run() { + // Read and recognise the tag + StreamContext ctxIn; + try { + byte[] tag = readTag(reader.getInputStream()); + ctxIn = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(false); + return; + } + if (ctxIn == null) { + LOG.info("Unrecognised tag"); + onError(false); + return; + } + PendingContactId inPendingContactId = ctxIn.getPendingContactId(); + if (inPendingContactId == null) { + LOG.warning("Expected rendezvous tag, got contact tag"); + onError(true); + return; + } + // Allocate the outgoing stream context + StreamContext ctxOut; + try { + ctxOut = keyManager.getStreamContext(pendingContactId, + transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onError(true); + return; + } + if (ctxOut == null) { + LOG.warning("Could not allocate stream context"); + onError(true); + return; + } + // Close the connection if it's redundant + if (!connectionRegistry.registerConnection(pendingContactId)) { + LOG.info("Redundant rendezvous connection"); + onError(true); + return; + } + // Flush the output stream to send the outgoing stream header + StreamWriter out; + try { + out = streamWriterFactory.createStreamWriter( + writer.getOutputStream(), ctxOut); + out.getOutputStream().flush(); + } catch (IOException e) { + logException(LOG, WARNING, e); + onError(true); + return; + } + // Handshake and exchange contacts + try { + InputStream in = streamReaderFactory.createStreamReader( + reader.getInputStream(), ctxIn); + HandshakeResult result = handshakeManager.handshake( + pendingContactId, in, out); + Contact contact = contactExchangeManager.exchangeContacts( + pendingContactId, connection, result.getMasterKey(), + result.isAlice(), false); + connectionRegistry.unregisterConnection(pendingContactId, true); + // Reuse the connection as a transport connection + manageOutgoingConnection(contact.getId(), transportId, + connection); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(true); + connectionRegistry.unregisterConnection(pendingContactId, + false); + } + } + + private void onError(boolean recognised) { + disposeOnError(reader, recognised); + disposeOnError(writer); + } + } + + private class ManageOutgoingHandshakeConnection implements Runnable { + + private final PendingContactId pendingContactId; + private final TransportId transportId; + private final DuplexTransportConnection connection; + private final TransportConnectionReader reader; + private final TransportConnectionWriter writer; + + private ManageOutgoingHandshakeConnection( + PendingContactId pendingContactId, TransportId transportId, + DuplexTransportConnection connection) { + this.pendingContactId = pendingContactId; + this.transportId = transportId; + this.connection = connection; + reader = connection.getReader(); + writer = connection.getWriter(); + } + + @Override + public void run() { + // Allocate the outgoing stream context + StreamContext ctxOut; + try { + ctxOut = keyManager.getStreamContext(pendingContactId, + transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + if (ctxOut == null) { + LOG.warning("Could not allocate stream context"); + onError(); + return; + } + // Flush the output stream to send the outgoing stream header + StreamWriter out; + try { + out = streamWriterFactory.createStreamWriter( + writer.getOutputStream(), ctxOut); + out.getOutputStream().flush(); + } catch (IOException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + // Read and recognise the tag + StreamContext ctxIn; + try { + byte[] tag = readTag(reader.getInputStream()); + ctxIn = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + // Unrecognised tags are suspicious in this case + if (ctxIn == null) { + LOG.warning("Unrecognised tag for returning stream"); + onError(); + return; + } + // Check that the stream comes from the expected pending contact + PendingContactId inPendingContactId = ctxIn.getPendingContactId(); + if (inPendingContactId == null) { + LOG.warning("Expected rendezvous tag, got contact tag"); + onError(); + return; + } + if (!inPendingContactId.equals(pendingContactId)) { + LOG.warning("Wrong pending contact ID for returning stream"); + onError(); + return; + } + // Close the connection if it's redundant + if (!connectionRegistry.registerConnection(pendingContactId)) { + LOG.info("Redundant rendezvous connection"); + onError(); + return; + } + // Create and run the handshake session + try { + InputStream in = streamReaderFactory.createStreamReader( + reader.getInputStream(), ctxIn); + HandshakeResult result = handshakeManager.handshake( + pendingContactId, in, out); + Contact contact = contactExchangeManager.exchangeContacts( + pendingContactId, connection, result.getMasterKey(), + result.isAlice(), false); + connectionRegistry.unregisterConnection(pendingContactId, true); + // Reuse the connection as a transport connection + manageOutgoingConnection(contact.getId(), transportId, + connection); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(); + connectionRegistry.unregisterConnection(pendingContactId, + false); + } + } + + private void onError() { + // 'Recognised' is always true for outgoing connections + disposeOnError(reader, true); + disposeOnError(writer); + } + } }