From c1101c7fe1e1c98d9ff35c553eef21f10ead6746 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Tue, 12 May 2020 17:08:04 +0100 Subject: [PATCH] Factor inner classes out of ConnectionManagerImpl. --- .../bramble/plugin/Connection.java | 61 ++ .../bramble/plugin/ConnectionFactory.java | 34 + .../bramble/plugin/ConnectionFactoryImpl.java | 109 +++ .../bramble/plugin/ConnectionManagerImpl.java | 668 +----------------- .../bramble/plugin/DuplexSyncConnection.java | 78 ++ .../bramble/plugin/HandshakeConnection.java | 52 ++ .../plugin/IncomingDuplexSyncConnection.java | 116 +++ .../plugin/IncomingHandshakeConnection.java | 105 +++ .../plugin/IncomingSimplexSyncConnection.java | 88 +++ .../plugin/OutgoingDuplexSyncConnection.java | 137 ++++ .../plugin/OutgoingHandshakeConnection.java | 129 ++++ .../plugin/OutgoingSimplexSyncConnection.java | 90 +++ .../bramble/plugin/PluginModule.java | 6 + .../bramble/plugin/SyncConnection.java | 44 ++ 14 files changed, 1064 insertions(+), 653 deletions(-) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/Connection.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionFactory.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionFactoryImpl.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/DuplexSyncConnection.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/HandshakeConnection.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingDuplexSyncConnection.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingHandshakeConnection.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingSimplexSyncConnection.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingDuplexSyncConnection.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingHandshakeConnection.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingSimplexSyncConnection.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/SyncConnection.java diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/Connection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/Connection.java new file mode 100644 index 000000000..e1dd8cf57 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/Connection.java @@ -0,0 +1,61 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.logging.Logger; + +import static java.util.logging.Level.WARNING; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH; +import static org.briarproject.bramble.util.IoUtils.read; +import static org.briarproject.bramble.util.LogUtils.logException; + +@NotNullByDefault +abstract class Connection { + + protected static final Logger LOG = getLogger(Connection.class.getName()); + + final KeyManager keyManager; + final ConnectionRegistry connectionRegistry; + final StreamReaderFactory streamReaderFactory; + final StreamWriterFactory streamWriterFactory; + + Connection(KeyManager keyManager, ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory) { + this.keyManager = keyManager; + this.connectionRegistry = connectionRegistry; + this.streamReaderFactory = streamReaderFactory; + this.streamWriterFactory = streamWriterFactory; + } + + byte[] readTag(InputStream in) throws IOException { + byte[] tag = new byte[TAG_LENGTH]; + read(in, tag); + return tag; + } + + void disposeOnError(TransportConnectionReader reader, boolean recognised) { + try { + reader.dispose(true, recognised); + } catch (IOException e) { + logException(LOG, WARNING, e); + } + } + + void disposeOnError(TransportConnectionWriter writer) { + try { + writer.dispose(true); + } catch (IOException e) { + logException(LOG, WARNING, e); + } + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionFactory.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionFactory.java new file mode 100644 index 000000000..e7bd16d1b --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionFactory.java @@ -0,0 +1,34 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionManager; +import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; + +@NotNullByDefault +interface ConnectionFactory { + + Runnable createIncomingSimplexSyncConnection(TransportId t, + TransportConnectionReader r); + + Runnable createIncomingDuplexSyncConnection(TransportId t, + DuplexTransportConnection d); + + Runnable createIncomingHandshakeConnection( + ConnectionManager connectionManager, PendingContactId p, + TransportId t, DuplexTransportConnection d); + + Runnable createOutgoingSimplexSyncConnection(ContactId c, TransportId t, + TransportConnectionWriter w); + + Runnable createOutgoingDuplexSyncConnection(ContactId c, TransportId t, + DuplexTransportConnection d); + + Runnable createOutgoingHandshakeConnection( + ConnectionManager connectionManager, + PendingContactId p, TransportId t, DuplexTransportConnection d); +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionFactoryImpl.java new file mode 100644 index 000000000..af3d6d21a --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionFactoryImpl.java @@ -0,0 +1,109 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactExchangeManager; +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.HandshakeManager; +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.lifecycle.IoExecutor; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionManager; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.SyncSessionFactory; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.util.concurrent.Executor; + +import javax.annotation.concurrent.Immutable; +import javax.inject.Inject; + +@Immutable +@NotNullByDefault +class ConnectionFactoryImpl implements ConnectionFactory { + + private final Executor ioExecutor; + private final KeyManager keyManager; + private final StreamReaderFactory streamReaderFactory; + private final StreamWriterFactory streamWriterFactory; + private final SyncSessionFactory syncSessionFactory; + private final HandshakeManager handshakeManager; + private final ContactExchangeManager contactExchangeManager; + private final ConnectionRegistry connectionRegistry; + private final TransportPropertyManager transportPropertyManager; + + @Inject + ConnectionFactoryImpl(@IoExecutor Executor ioExecutor, + KeyManager keyManager, StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + SyncSessionFactory syncSessionFactory, + HandshakeManager handshakeManager, + ContactExchangeManager contactExchangeManager, + ConnectionRegistry connectionRegistry, + TransportPropertyManager transportPropertyManager) { + this.ioExecutor = ioExecutor; + this.keyManager = keyManager; + this.streamReaderFactory = streamReaderFactory; + this.streamWriterFactory = streamWriterFactory; + this.syncSessionFactory = syncSessionFactory; + this.handshakeManager = handshakeManager; + this.contactExchangeManager = contactExchangeManager; + this.connectionRegistry = connectionRegistry; + this.transportPropertyManager = transportPropertyManager; + } + + @Override + public Runnable createIncomingSimplexSyncConnection(TransportId t, + TransportConnectionReader r) { + return new IncomingSimplexSyncConnection(keyManager, + connectionRegistry, streamReaderFactory, streamWriterFactory, + syncSessionFactory, transportPropertyManager, t, r); + } + + @Override + public Runnable createIncomingDuplexSyncConnection(TransportId t, + DuplexTransportConnection d) { + return new IncomingDuplexSyncConnection(keyManager, connectionRegistry, + streamReaderFactory, streamWriterFactory, syncSessionFactory, + transportPropertyManager, ioExecutor, t, d); + } + + @Override + public Runnable createIncomingHandshakeConnection( + ConnectionManager connectionManager, PendingContactId p, + TransportId t, DuplexTransportConnection d) { + return new IncomingHandshakeConnection(keyManager, connectionRegistry, + streamReaderFactory, streamWriterFactory, handshakeManager, + contactExchangeManager, connectionManager, p, t, d); + } + + @Override + public Runnable createOutgoingSimplexSyncConnection(ContactId c, + TransportId t, TransportConnectionWriter w) { + return new OutgoingSimplexSyncConnection(keyManager, + connectionRegistry, streamReaderFactory, streamWriterFactory, + syncSessionFactory, transportPropertyManager, c, t, w); + } + + @Override + public Runnable createOutgoingDuplexSyncConnection(ContactId c, + TransportId t, DuplexTransportConnection d) { + return new OutgoingDuplexSyncConnection(keyManager, connectionRegistry, + streamReaderFactory, streamWriterFactory, syncSessionFactory, + transportPropertyManager, ioExecutor, c, t, d); + } + + @Override + public Runnable createOutgoingHandshakeConnection( + ConnectionManager connectionManager, PendingContactId p, + TransportId t, DuplexTransportConnection d) { + return new OutgoingHandshakeConnection(keyManager, connectionRegistry, + streamReaderFactory, streamWriterFactory, handshakeManager, + contactExchangeManager, connectionManager, p, t, d); + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index ba45b6e1a..201da7980 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -1,709 +1,71 @@ package org.briarproject.bramble.plugin; -import org.briarproject.bramble.api.contact.Contact; -import org.briarproject.bramble.api.contact.ContactExchangeManager; import org.briarproject.bramble.api.contact.ContactId; -import org.briarproject.bramble.api.contact.HandshakeManager; -import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult; import org.briarproject.bramble.api.contact.PendingContactId; -import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.ConnectionManager; -import org.briarproject.bramble.api.plugin.ConnectionRegistry; import org.briarproject.bramble.api.plugin.TransportConnectionReader; import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; -import org.briarproject.bramble.api.properties.TransportProperties; -import org.briarproject.bramble.api.properties.TransportPropertyManager; -import org.briarproject.bramble.api.sync.SyncSession; -import org.briarproject.bramble.api.sync.SyncSessionFactory; -import org.briarproject.bramble.api.transport.KeyManager; -import org.briarproject.bramble.api.transport.StreamContext; -import org.briarproject.bramble.api.transport.StreamReaderFactory; -import org.briarproject.bramble.api.transport.StreamWriter; -import org.briarproject.bramble.api.transport.StreamWriterFactory; -import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.Executor; -import java.util.logging.Logger; -import javax.annotation.Nullable; import javax.inject.Inject; -import static java.util.logging.Level.WARNING; -import static java.util.logging.Logger.getLogger; -import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; -import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH; -import static org.briarproject.bramble.util.IoUtils.read; -import static org.briarproject.bramble.util.LogUtils.logException; - @NotNullByDefault class ConnectionManagerImpl implements ConnectionManager { - private static final Logger LOG = - getLogger(ConnectionManagerImpl.class.getName()); - private final Executor ioExecutor; - private final KeyManager keyManager; - private final StreamReaderFactory streamReaderFactory; - private final StreamWriterFactory streamWriterFactory; - private final SyncSessionFactory syncSessionFactory; - private final HandshakeManager handshakeManager; - private final ContactExchangeManager contactExchangeManager; - private final ConnectionRegistry connectionRegistry; - private final TransportPropertyManager transportPropertyManager; + private final ConnectionFactory connectionFactory; @Inject ConnectionManagerImpl(@IoExecutor Executor ioExecutor, - KeyManager keyManager, StreamReaderFactory streamReaderFactory, - StreamWriterFactory streamWriterFactory, - SyncSessionFactory syncSessionFactory, - HandshakeManager handshakeManager, - ContactExchangeManager contactExchangeManager, - ConnectionRegistry connectionRegistry, - TransportPropertyManager transportPropertyManager) { + ConnectionFactory connectionFactory) { this.ioExecutor = ioExecutor; - this.keyManager = keyManager; - this.streamReaderFactory = streamReaderFactory; - this.streamWriterFactory = streamWriterFactory; - this.syncSessionFactory = syncSessionFactory; - this.handshakeManager = handshakeManager; - this.contactExchangeManager = contactExchangeManager; - this.connectionRegistry = connectionRegistry; - this.transportPropertyManager = transportPropertyManager; + this.connectionFactory = connectionFactory; } @Override public void manageIncomingConnection(TransportId t, TransportConnectionReader r) { - ioExecutor.execute(new ManageIncomingSimplexConnection(t, r)); + ioExecutor.execute( + connectionFactory.createIncomingSimplexSyncConnection(t, r)); } @Override public void manageIncomingConnection(TransportId t, DuplexTransportConnection d) { - ioExecutor.execute(new ManageIncomingDuplexConnection(t, d)); + ioExecutor.execute( + connectionFactory.createIncomingDuplexSyncConnection(t, d)); } @Override public void manageIncomingConnection(PendingContactId p, TransportId t, DuplexTransportConnection d) { - ioExecutor.execute(new ManageIncomingHandshakeConnection(p, t, d)); + ioExecutor.execute(connectionFactory + .createIncomingHandshakeConnection(this, p, t, d)); } @Override public void manageOutgoingConnection(ContactId c, TransportId t, TransportConnectionWriter w) { - ioExecutor.execute(new ManageOutgoingSimplexConnection(c, t, w)); + ioExecutor.execute( + connectionFactory.createOutgoingSimplexSyncConnection(c, t, w)); } @Override public void manageOutgoingConnection(ContactId c, TransportId t, DuplexTransportConnection d) { - ioExecutor.execute(new ManageOutgoingDuplexConnection(c, t, d)); + ioExecutor.execute( + connectionFactory.createOutgoingDuplexSyncConnection(c, t, d)); } @Override public void manageOutgoingConnection(PendingContactId p, TransportId t, DuplexTransportConnection d) { - ioExecutor.execute(new ManageOutgoingHandshakeConnection(p, t, d)); - } - - private byte[] readTag(InputStream in) throws IOException { - byte[] tag = new byte[TAG_LENGTH]; - read(in, tag); - return tag; - } - - private SyncSession createIncomingSession(StreamContext ctx, - TransportConnectionReader r) throws IOException { - InputStream streamReader = streamReaderFactory.createStreamReader( - r.getInputStream(), ctx); - ContactId c = requireNonNull(ctx.getContactId()); - return syncSessionFactory.createIncomingSession(c, streamReader); - } - - private SyncSession createSimplexOutgoingSession(StreamContext ctx, - TransportConnectionWriter w) throws IOException { - StreamWriter streamWriter = streamWriterFactory.createStreamWriter( - w.getOutputStream(), ctx); - ContactId c = requireNonNull(ctx.getContactId()); - return syncSessionFactory.createSimplexOutgoingSession(c, - w.getMaxLatency(), streamWriter); - } - - private SyncSession createDuplexOutgoingSession(StreamContext ctx, - TransportConnectionWriter w) throws IOException { - StreamWriter streamWriter = streamWriterFactory.createStreamWriter( - w.getOutputStream(), ctx); - ContactId c = requireNonNull(ctx.getContactId()); - return syncSessionFactory.createDuplexOutgoingSession(c, - w.getMaxLatency(), w.getMaxIdleTime(), streamWriter); - } - - private void disposeOnError(TransportConnectionReader reader, - boolean recognised) { - try { - reader.dispose(true, recognised); - } catch (IOException e) { - logException(LOG, WARNING, e); - } - } - - private void disposeOnError(TransportConnectionWriter writer) { - try { - writer.dispose(true); - } catch (IOException e) { - logException(LOG, WARNING, e); - } - } - - private class ManageIncomingSimplexConnection implements Runnable { - - private final TransportId transportId; - private final TransportConnectionReader reader; - - private ManageIncomingSimplexConnection(TransportId transportId, - TransportConnectionReader reader) { - this.transportId = transportId; - this.reader = reader; - } - - @Override - public void run() { - // Read and recognise the tag - StreamContext ctx; - try { - byte[] tag = readTag(reader.getInputStream()); - ctx = keyManager.getStreamContext(transportId, tag); - } catch (IOException | DbException e) { - logException(LOG, WARNING, e); - onError(false); - return; - } - if (ctx == null) { - LOG.info("Unrecognised tag"); - onError(false); - return; - } - ContactId contactId = ctx.getContactId(); - if (contactId == null) { - LOG.warning("Received rendezvous stream, expected contact"); - onError(true); - return; - } - if (ctx.isHandshakeMode()) { - // TODO: Support handshake mode for contacts - LOG.warning("Received handshake tag, expected rotation mode"); - onError(true); - return; - } - connectionRegistry.registerConnection(contactId, transportId, true); - try { - // Create and run the incoming session - createIncomingSession(ctx, reader).run(); - reader.dispose(false, true); - } catch (IOException e) { - logException(LOG, WARNING, e); - onError(true); - } finally { - connectionRegistry.unregisterConnection(contactId, transportId, - true); - } - } - - private void onError(boolean recognised) { - disposeOnError(reader, recognised); - } - } - - private class ManageOutgoingSimplexConnection implements Runnable { - - private final ContactId contactId; - private final TransportId transportId; - private final TransportConnectionWriter writer; - - private ManageOutgoingSimplexConnection(ContactId contactId, - TransportId transportId, TransportConnectionWriter writer) { - this.contactId = contactId; - this.transportId = transportId; - this.writer = writer; - } - - @Override - public void run() { - // Allocate a stream context - StreamContext ctx; - try { - ctx = keyManager.getStreamContext(contactId, transportId); - } catch (DbException e) { - logException(LOG, WARNING, e); - onError(); - return; - } - if (ctx == null) { - LOG.warning("Could not allocate stream context"); - onError(); - return; - } - connectionRegistry.registerConnection(contactId, transportId, - false); - try { - // Create and run the outgoing session - createSimplexOutgoingSession(ctx, writer).run(); - writer.dispose(false); - } catch (IOException e) { - logException(LOG, WARNING, e); - onError(); - } finally { - connectionRegistry.unregisterConnection(contactId, transportId, - false); - } - } - - private void onError() { - disposeOnError(writer); - } - } - - private class ManageIncomingDuplexConnection implements Runnable { - - private final TransportId transportId; - private final TransportConnectionReader reader; - private final TransportConnectionWriter writer; - private final TransportProperties remote; - - @Nullable - private volatile SyncSession outgoingSession = null; - - private ManageIncomingDuplexConnection(TransportId transportId, - DuplexTransportConnection connection) { - this.transportId = transportId; - reader = connection.getReader(); - writer = connection.getWriter(); - remote = connection.getRemoteProperties(); - } - - @Override - public void run() { - // Read and recognise the tag - StreamContext ctx; - try { - byte[] tag = readTag(reader.getInputStream()); - ctx = keyManager.getStreamContext(transportId, tag); - } catch (IOException | DbException e) { - logException(LOG, WARNING, e); - onReadError(false); - return; - } - if (ctx == null) { - LOG.info("Unrecognised tag"); - onReadError(false); - return; - } - ContactId contactId = ctx.getContactId(); - if (contactId == null) { - LOG.warning("Expected contact tag, got rendezvous tag"); - onReadError(true); - return; - } - if (ctx.isHandshakeMode()) { - // TODO: Support handshake mode for contacts - LOG.warning("Received handshake tag, expected rotation mode"); - onReadError(true); - return; - } - connectionRegistry.registerConnection(contactId, transportId, true); - // Start the outgoing session on another thread - ioExecutor.execute(() -> runOutgoingSession(contactId)); - try { - // Store any transport properties discovered from the connection - transportPropertyManager.addRemotePropertiesFromConnection( - contactId, transportId, remote); - // Create and run the incoming session - createIncomingSession(ctx, reader).run(); - reader.dispose(false, true); - // Interrupt the outgoing session so it finishes cleanly - SyncSession out = outgoingSession; - if (out != null) out.interrupt(); - } catch (DbException | IOException e) { - logException(LOG, WARNING, e); - onReadError(true); - } finally { - connectionRegistry.unregisterConnection(contactId, transportId, - true); - } - } - - private void runOutgoingSession(ContactId contactId) { - // Allocate a stream context - StreamContext ctx; - try { - ctx = keyManager.getStreamContext(contactId, transportId); - } catch (DbException e) { - logException(LOG, WARNING, e); - onWriteError(); - return; - } - if (ctx == null) { - LOG.warning("Could not allocate stream context"); - onWriteError(); - return; - } - try { - // Create and run the outgoing session - SyncSession out = createDuplexOutgoingSession(ctx, writer); - outgoingSession = out; - out.run(); - writer.dispose(false); - } catch (IOException e) { - logException(LOG, WARNING, e); - onWriteError(); - } - } - - private void onReadError(boolean recognised) { - disposeOnError(reader, recognised); - disposeOnError(writer); - // Interrupt the outgoing session so it finishes - SyncSession out = outgoingSession; - if (out != null) out.interrupt(); - } - - private void onWriteError() { - disposeOnError(reader, true); - disposeOnError(writer); - } - } - - private class ManageOutgoingDuplexConnection implements Runnable { - - private final ContactId contactId; - private final TransportId transportId; - private final TransportConnectionReader reader; - private final TransportConnectionWriter writer; - private final TransportProperties remote; - - @Nullable - private volatile SyncSession outgoingSession = null; - - private ManageOutgoingDuplexConnection(ContactId contactId, - TransportId transportId, DuplexTransportConnection connection) { - this.contactId = contactId; - this.transportId = transportId; - reader = connection.getReader(); - writer = connection.getWriter(); - remote = connection.getRemoteProperties(); - } - - @Override - public void run() { - // Allocate a stream context - StreamContext ctx; - try { - ctx = keyManager.getStreamContext(contactId, transportId); - } catch (DbException e) { - logException(LOG, WARNING, e); - onWriteError(); - return; - } - if (ctx == null) { - LOG.warning("Could not allocate stream context"); - onWriteError(); - return; - } - if (ctx.isHandshakeMode()) { - // TODO: Support handshake mode for contacts - LOG.warning("Cannot use handshake mode stream context"); - onWriteError(); - return; - } - // Start the incoming session on another thread - ioExecutor.execute(this::runIncomingSession); - try { - // Create and run the outgoing session - SyncSession out = createDuplexOutgoingSession(ctx, writer); - outgoingSession = out; - out.run(); - writer.dispose(false); - } catch (IOException e) { - logException(LOG, WARNING, e); - onWriteError(); - } - } - - private void runIncomingSession() { - // Read and recognise the tag - StreamContext ctx; - try { - byte[] tag = readTag(reader.getInputStream()); - ctx = keyManager.getStreamContext(transportId, tag); - } catch (IOException | DbException e) { - logException(LOG, WARNING, e); - onReadError(); - return; - } - // Unrecognised tags are suspicious in this case - if (ctx == null) { - LOG.warning("Unrecognised tag for returning stream"); - onReadError(); - return; - } - // Check that the stream comes from the expected contact - ContactId inContactId = ctx.getContactId(); - if (inContactId == null) { - LOG.warning("Expected contact tag, got rendezvous tag"); - onReadError(); - return; - } - if (!contactId.equals(inContactId)) { - LOG.warning("Wrong contact ID for returning stream"); - onReadError(); - return; - } - if (ctx.isHandshakeMode()) { - // TODO: Support handshake mode for contacts - LOG.warning("Received handshake tag, expected rotation mode"); - onReadError(); - return; - } - connectionRegistry.registerConnection(contactId, transportId, - false); - try { - // Store any transport properties discovered from the connection - transportPropertyManager.addRemotePropertiesFromConnection( - contactId, transportId, remote); - // Create and run the incoming session - createIncomingSession(ctx, reader).run(); - reader.dispose(false, true); - // Interrupt the outgoing session so it finishes cleanly - SyncSession out = outgoingSession; - if (out != null) out.interrupt(); - } catch (DbException | IOException e) { - logException(LOG, WARNING, e); - onReadError(); - } finally { - connectionRegistry.unregisterConnection(contactId, transportId, - false); - } - } - - private void onReadError() { - // 'Recognised' is always true for outgoing connections - disposeOnError(reader, true); - disposeOnError(writer); - // Interrupt the outgoing session so it finishes - SyncSession out = outgoingSession; - if (out != null) out.interrupt(); - } - - private void onWriteError() { - disposeOnError(reader, true); - disposeOnError(writer); - } - } - - private class ManageIncomingHandshakeConnection implements Runnable { - - private final PendingContactId pendingContactId; - private final TransportId transportId; - private final DuplexTransportConnection connection; - private final TransportConnectionReader reader; - private final TransportConnectionWriter writer; - - private ManageIncomingHandshakeConnection( - PendingContactId pendingContactId, TransportId transportId, - DuplexTransportConnection connection) { - this.pendingContactId = pendingContactId; - this.transportId = transportId; - this.connection = connection; - reader = connection.getReader(); - writer = connection.getWriter(); - } - - @Override - public void run() { - // Read and recognise the tag - StreamContext ctxIn; - try { - byte[] tag = readTag(reader.getInputStream()); - ctxIn = keyManager.getStreamContext(transportId, tag); - } catch (IOException | DbException e) { - logException(LOG, WARNING, e); - onError(false); - return; - } - if (ctxIn == null) { - LOG.info("Unrecognised tag"); - onError(false); - return; - } - PendingContactId inPendingContactId = ctxIn.getPendingContactId(); - if (inPendingContactId == null) { - LOG.warning("Expected rendezvous tag, got contact tag"); - onError(true); - return; - } - // Allocate the outgoing stream context - StreamContext ctxOut; - try { - ctxOut = keyManager.getStreamContext(pendingContactId, - transportId); - } catch (DbException e) { - logException(LOG, WARNING, e); - onError(true); - return; - } - if (ctxOut == null) { - LOG.warning("Could not allocate stream context"); - onError(true); - return; - } - // Close the connection if it's redundant - if (!connectionRegistry.registerConnection(pendingContactId)) { - LOG.info("Redundant rendezvous connection"); - onError(true); - return; - } - // Handshake and exchange contacts - try { - InputStream in = streamReaderFactory.createStreamReader( - reader.getInputStream(), ctxIn); - // Flush the output stream to send the outgoing stream header - StreamWriter out = streamWriterFactory.createStreamWriter( - writer.getOutputStream(), ctxOut); - out.getOutputStream().flush(); - HandshakeResult result = handshakeManager.handshake( - pendingContactId, in, out); - Contact contact = contactExchangeManager.exchangeContacts( - pendingContactId, connection, result.getMasterKey(), - result.isAlice(), false); - connectionRegistry.unregisterConnection(pendingContactId, true); - // Reuse the connection as a transport connection - manageOutgoingConnection(contact.getId(), transportId, - connection); - } catch (IOException | DbException e) { - logException(LOG, WARNING, e); - onError(true); - connectionRegistry.unregisterConnection(pendingContactId, - false); - } - } - - private void onError(boolean recognised) { - disposeOnError(reader, recognised); - disposeOnError(writer); - } - } - - private class ManageOutgoingHandshakeConnection implements Runnable { - - private final PendingContactId pendingContactId; - private final TransportId transportId; - private final DuplexTransportConnection connection; - private final TransportConnectionReader reader; - private final TransportConnectionWriter writer; - - private ManageOutgoingHandshakeConnection( - PendingContactId pendingContactId, TransportId transportId, - DuplexTransportConnection connection) { - this.pendingContactId = pendingContactId; - this.transportId = transportId; - this.connection = connection; - reader = connection.getReader(); - writer = connection.getWriter(); - } - - @Override - public void run() { - // Allocate the outgoing stream context - StreamContext ctxOut; - try { - ctxOut = keyManager.getStreamContext(pendingContactId, - transportId); - } catch (DbException e) { - logException(LOG, WARNING, e); - onError(); - return; - } - if (ctxOut == null) { - LOG.warning("Could not allocate stream context"); - onError(); - return; - } - // Flush the output stream to send the outgoing stream header - StreamWriter out; - try { - out = streamWriterFactory.createStreamWriter( - writer.getOutputStream(), ctxOut); - out.getOutputStream().flush(); - } catch (IOException e) { - logException(LOG, WARNING, e); - onError(); - return; - } - // Read and recognise the tag - StreamContext ctxIn; - try { - byte[] tag = readTag(reader.getInputStream()); - ctxIn = keyManager.getStreamContext(transportId, tag); - } catch (IOException | DbException e) { - logException(LOG, WARNING, e); - onError(); - return; - } - // Unrecognised tags are suspicious in this case - if (ctxIn == null) { - LOG.warning("Unrecognised tag for returning stream"); - onError(); - return; - } - // Check that the stream comes from the expected pending contact - PendingContactId inPendingContactId = ctxIn.getPendingContactId(); - if (inPendingContactId == null) { - LOG.warning("Expected rendezvous tag, got contact tag"); - onError(); - return; - } - if (!inPendingContactId.equals(pendingContactId)) { - LOG.warning("Wrong pending contact ID for returning stream"); - onError(); - return; - } - // Close the connection if it's redundant - if (!connectionRegistry.registerConnection(pendingContactId)) { - LOG.info("Redundant rendezvous connection"); - onError(); - return; - } - // Handshake and exchange contacts - try { - InputStream in = streamReaderFactory.createStreamReader( - reader.getInputStream(), ctxIn); - HandshakeResult result = handshakeManager.handshake( - pendingContactId, in, out); - Contact contact = contactExchangeManager.exchangeContacts( - pendingContactId, connection, result.getMasterKey(), - result.isAlice(), false); - connectionRegistry.unregisterConnection(pendingContactId, true); - // Reuse the connection as a transport connection - manageOutgoingConnection(contact.getId(), transportId, - connection); - } catch (IOException | DbException e) { - logException(LOG, WARNING, e); - onError(); - connectionRegistry.unregisterConnection(pendingContactId, - false); - } - } - - private void onError() { - // 'Recognised' is always true for outgoing connections - disposeOnError(reader, true); - disposeOnError(writer); - } + ioExecutor.execute(connectionFactory + .createOutgoingHandshakeConnection(this, p, t, d)); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/DuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/DuplexSyncConnection.java new file mode 100644 index 000000000..bf4526086 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/DuplexSyncConnection.java @@ -0,0 +1,78 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.properties.TransportProperties; +import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.SyncSession; +import org.briarproject.bramble.api.sync.SyncSessionFactory; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriter; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.io.IOException; +import java.util.concurrent.Executor; + +import javax.annotation.Nullable; + +import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; + +@NotNullByDefault +abstract class DuplexSyncConnection extends SyncConnection { + + final Executor ioExecutor; + final TransportId transportId; + final TransportConnectionReader reader; + final TransportConnectionWriter writer; + final TransportProperties remote; + + @Nullable + volatile SyncSession outgoingSession = null; + + DuplexSyncConnection(KeyManager keyManager, + ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + SyncSessionFactory syncSessionFactory, + TransportPropertyManager transportPropertyManager, + Executor ioExecutor, TransportId transportId, + DuplexTransportConnection connection) { + super(keyManager, connectionRegistry, streamReaderFactory, + streamWriterFactory, syncSessionFactory, + transportPropertyManager); + this.ioExecutor = ioExecutor; + this.transportId = transportId; + reader = connection.getReader(); + writer = connection.getWriter(); + remote = connection.getRemoteProperties(); + } + + void onReadError(boolean recognised) { + disposeOnError(reader, recognised); + disposeOnError(writer); + // Interrupt the outgoing session so it finishes + SyncSession out = outgoingSession; + if (out != null) out.interrupt(); + } + + void onWriteError() { + disposeOnError(reader, true); + disposeOnError(writer); + } + + SyncSession createDuplexOutgoingSession(StreamContext ctx, + TransportConnectionWriter w) throws IOException { + StreamWriter streamWriter = streamWriterFactory.createStreamWriter( + w.getOutputStream(), ctx); + ContactId c = requireNonNull(ctx.getContactId()); + return syncSessionFactory.createDuplexOutgoingSession(c, + w.getMaxLatency(), w.getMaxIdleTime(), streamWriter); + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/HandshakeConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/HandshakeConnection.java new file mode 100644 index 000000000..1e3f672fa --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/HandshakeConnection.java @@ -0,0 +1,52 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactExchangeManager; +import org.briarproject.bramble.api.contact.HandshakeManager; +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.plugin.ConnectionManager; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +abstract class HandshakeConnection extends Connection { + + final HandshakeManager handshakeManager; + final ContactExchangeManager contactExchangeManager; + final ConnectionManager connectionManager; + final PendingContactId pendingContactId; + final TransportId transportId; + final DuplexTransportConnection connection; + final TransportConnectionReader reader; + final TransportConnectionWriter writer; + + HandshakeConnection(KeyManager keyManager, + ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + HandshakeManager handshakeManager, + ContactExchangeManager contactExchangeManager, + ConnectionManager connectionManager, + PendingContactId pendingContactId, + TransportId transportId, DuplexTransportConnection connection) { + super(keyManager, connectionRegistry, streamReaderFactory, + streamWriterFactory); + this.handshakeManager = handshakeManager; + this.contactExchangeManager = contactExchangeManager; + this.connectionManager = connectionManager; + this.pendingContactId = pendingContactId; + this.transportId = transportId; + this.connection = connection; + reader = connection.getReader(); + writer = connection.getWriter(); + } + + void onError(boolean recognised) { + disposeOnError(reader, recognised); + disposeOnError(writer); + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingDuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingDuplexSyncConnection.java new file mode 100644 index 000000000..2a50ab151 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingDuplexSyncConnection.java @@ -0,0 +1,116 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.SyncSession; +import org.briarproject.bramble.api.sync.SyncSessionFactory; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.io.IOException; +import java.util.concurrent.Executor; + +import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.util.LogUtils.logException; + +class IncomingDuplexSyncConnection extends DuplexSyncConnection + implements Runnable { + + IncomingDuplexSyncConnection(KeyManager keyManager, + ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + SyncSessionFactory syncSessionFactory, + TransportPropertyManager transportPropertyManager, + Executor ioExecutor, TransportId transportId, + DuplexTransportConnection connection) { + super(keyManager, connectionRegistry, streamReaderFactory, + streamWriterFactory, syncSessionFactory, + transportPropertyManager, ioExecutor, transportId, connection); + } + + @Override + public void run() { + // Read and recognise the tag + StreamContext ctx; + try { + byte[] tag = readTag(reader.getInputStream()); + ctx = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onReadError(false); + return; + } + if (ctx == null) { + LOG.info("Unrecognised tag"); + onReadError(false); + return; + } + ContactId contactId = ctx.getContactId(); + if (contactId == null) { + LOG.warning("Expected contact tag, got rendezvous tag"); + onReadError(true); + return; + } + if (ctx.isHandshakeMode()) { + // TODO: Support handshake mode for contacts + LOG.warning("Received handshake tag, expected rotation mode"); + onReadError(true); + return; + } + connectionRegistry.registerConnection(contactId, transportId, true); + // Start the outgoing session on another thread + ioExecutor.execute(() -> runOutgoingSession(contactId)); + try { + // Store any transport properties discovered from the connection + transportPropertyManager.addRemotePropertiesFromConnection( + contactId, transportId, remote); + // Create and run the incoming session + createIncomingSession(ctx, reader).run(); + reader.dispose(false, true); + // Interrupt the outgoing session so it finishes cleanly + SyncSession out = outgoingSession; + if (out != null) out.interrupt(); + } catch (DbException | IOException e) { + logException(LOG, WARNING, e); + onReadError(true); + } finally { + connectionRegistry.unregisterConnection(contactId, transportId, + true); + } + } + + private void runOutgoingSession(ContactId contactId) { + // Allocate a stream context + StreamContext ctx; + try { + ctx = keyManager.getStreamContext(contactId, transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onWriteError(); + return; + } + if (ctx == null) { + LOG.warning("Could not allocate stream context"); + onWriteError(); + return; + } + try { + // Create and run the outgoing session + SyncSession out = createDuplexOutgoingSession(ctx, writer); + outgoingSession = out; + out.run(); + writer.dispose(false); + } catch (IOException e) { + logException(LOG, WARNING, e); + onWriteError(); + } + } +} + diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingHandshakeConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingHandshakeConnection.java new file mode 100644 index 000000000..a0e4db535 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingHandshakeConnection.java @@ -0,0 +1,105 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactExchangeManager; +import org.briarproject.bramble.api.contact.HandshakeManager; +import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult; +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.plugin.ConnectionManager; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriter; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.io.IOException; +import java.io.InputStream; + +import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.util.LogUtils.logException; + +class IncomingHandshakeConnection extends HandshakeConnection + implements Runnable { + + IncomingHandshakeConnection(KeyManager keyManager, + ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + HandshakeManager handshakeManager, + ContactExchangeManager contactExchangeManager, + ConnectionManager connectionManager, + PendingContactId pendingContactId, + TransportId transportId, DuplexTransportConnection connection) { + super(keyManager, connectionRegistry, streamReaderFactory, + streamWriterFactory, handshakeManager, contactExchangeManager, + connectionManager, pendingContactId, transportId, connection); + } + + @Override + public void run() { + // Read and recognise the tag + StreamContext ctxIn; + try { + byte[] tag = readTag(reader.getInputStream()); + ctxIn = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(false); + return; + } + if (ctxIn == null) { + LOG.info("Unrecognised tag"); + onError(false); + return; + } + PendingContactId inPendingContactId = ctxIn.getPendingContactId(); + if (inPendingContactId == null) { + LOG.warning("Expected rendezvous tag, got contact tag"); + onError(true); + return; + } + // Allocate the outgoing stream context + StreamContext ctxOut; + try { + ctxOut = keyManager.getStreamContext(pendingContactId, transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onError(true); + return; + } + if (ctxOut == null) { + LOG.warning("Could not allocate stream context"); + onError(true); + return; + } + // Close the connection if it's redundant + if (!connectionRegistry.registerConnection(pendingContactId)) { + LOG.info("Redundant rendezvous connection"); + onError(true); + return; + } + // Handshake and exchange contacts + try { + InputStream in = streamReaderFactory.createStreamReader( + reader.getInputStream(), ctxIn); + // Flush the output stream to send the outgoing stream header + StreamWriter out = streamWriterFactory.createStreamWriter( + writer.getOutputStream(), ctxOut); + out.getOutputStream().flush(); + HandshakeResult result = + handshakeManager.handshake(pendingContactId, in, out); + contactExchangeManager.exchangeContacts(pendingContactId, + connection, result.getMasterKey(), result.isAlice(), false); + connectionRegistry.unregisterConnection(pendingContactId, true); + // Reuse the connection as a transport connection + connectionManager.manageIncomingConnection(transportId, connection); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(true); + connectionRegistry.unregisterConnection(pendingContactId, false); + } + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingSimplexSyncConnection.java new file mode 100644 index 000000000..2011412ae --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/IncomingSimplexSyncConnection.java @@ -0,0 +1,88 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.SyncSessionFactory; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.io.IOException; + +import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.util.LogUtils.logException; + +@NotNullByDefault +class IncomingSimplexSyncConnection extends SyncConnection implements Runnable { + + private final TransportId transportId; + private final TransportConnectionReader reader; + + IncomingSimplexSyncConnection(KeyManager keyManager, + ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + SyncSessionFactory syncSessionFactory, + TransportPropertyManager transportPropertyManager, + TransportId transportId, TransportConnectionReader reader) { + super(keyManager, connectionRegistry, streamReaderFactory, + streamWriterFactory, syncSessionFactory, + transportPropertyManager); + this.transportId = transportId; + this.reader = reader; + } + + @Override + public void run() { + // Read and recognise the tag + StreamContext ctx; + try { + byte[] tag = readTag(reader.getInputStream()); + ctx = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(false); + return; + } + if (ctx == null) { + LOG.info("Unrecognised tag"); + onError(false); + return; + } + ContactId contactId = ctx.getContactId(); + if (contactId == null) { + LOG.warning("Received rendezvous stream, expected contact"); + onError(true); + return; + } + if (ctx.isHandshakeMode()) { + // TODO: Support handshake mode for contacts + LOG.warning("Received handshake tag, expected rotation mode"); + onError(true); + return; + } + connectionRegistry.registerConnection(contactId, transportId, true); + try { + // Create and run the incoming session + createIncomingSession(ctx, reader).run(); + reader.dispose(false, true); + } catch (IOException e) { + logException(LOG, WARNING, e); + onError(true); + } finally { + connectionRegistry.unregisterConnection(contactId, transportId, + true); + } + } + + private void onError(boolean recognised) { + disposeOnError(reader, recognised); + } +} + diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingDuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingDuplexSyncConnection.java new file mode 100644 index 000000000..bc4aebcce --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingDuplexSyncConnection.java @@ -0,0 +1,137 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.SyncSession; +import org.briarproject.bramble.api.sync.SyncSessionFactory; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.io.IOException; +import java.util.concurrent.Executor; + +import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.util.LogUtils.logException; + +class OutgoingDuplexSyncConnection extends DuplexSyncConnection + implements Runnable { + + private final ContactId contactId; + + OutgoingDuplexSyncConnection(KeyManager keyManager, + ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + SyncSessionFactory syncSessionFactory, + TransportPropertyManager transportPropertyManager, + Executor ioExecutor, ContactId contactId, TransportId transportId, + DuplexTransportConnection connection) { + super(keyManager, connectionRegistry, streamReaderFactory, + streamWriterFactory, syncSessionFactory, + transportPropertyManager, ioExecutor, transportId, connection); + this.contactId = contactId; + } + + @Override + public void run() { + // Allocate a stream context + StreamContext ctx; + try { + ctx = keyManager.getStreamContext(contactId, transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onWriteError(); + return; + } + if (ctx == null) { + LOG.warning("Could not allocate stream context"); + onWriteError(); + return; + } + if (ctx.isHandshakeMode()) { + // TODO: Support handshake mode for contacts + LOG.warning("Cannot use handshake mode stream context"); + onWriteError(); + return; + } + // Start the incoming session on another thread + ioExecutor.execute(this::runIncomingSession); + try { + // Create and run the outgoing session + SyncSession out = createDuplexOutgoingSession(ctx, writer); + outgoingSession = out; + out.run(); + writer.dispose(false); + } catch (IOException e) { + logException(LOG, WARNING, e); + onWriteError(); + } + } + + private void runIncomingSession() { + // Read and recognise the tag + StreamContext ctx; + try { + byte[] tag = readTag(reader.getInputStream()); + ctx = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onReadError(); + return; + } + // Unrecognised tags are suspicious in this case + if (ctx == null) { + LOG.warning("Unrecognised tag for returning stream"); + onReadError(); + return; + } + // Check that the stream comes from the expected contact + ContactId inContactId = ctx.getContactId(); + if (inContactId == null) { + LOG.warning("Expected contact tag, got rendezvous tag"); + onReadError(); + return; + } + if (!contactId.equals(inContactId)) { + LOG.warning("Wrong contact ID for returning stream"); + onReadError(); + return; + } + if (ctx.isHandshakeMode()) { + // TODO: Support handshake mode for contacts + LOG.warning("Received handshake tag, expected rotation mode"); + onReadError(); + return; + } + connectionRegistry.registerConnection(contactId, transportId, + false); + try { + // Store any transport properties discovered from the connection + transportPropertyManager.addRemotePropertiesFromConnection( + contactId, transportId, remote); + // Create and run the incoming session + createIncomingSession(ctx, reader).run(); + reader.dispose(false, true); + // Interrupt the outgoing session so it finishes cleanly + SyncSession out = outgoingSession; + if (out != null) out.interrupt(); + } catch (DbException | IOException e) { + logException(LOG, WARNING, e); + onReadError(); + } finally { + connectionRegistry.unregisterConnection(contactId, transportId, + false); + } + } + + private void onReadError() { + // 'Recognised' is always true for outgoing connections + onReadError(true); + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingHandshakeConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingHandshakeConnection.java new file mode 100644 index 000000000..e9aeb9ff7 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingHandshakeConnection.java @@ -0,0 +1,129 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.Contact; +import org.briarproject.bramble.api.contact.ContactExchangeManager; +import org.briarproject.bramble.api.contact.HandshakeManager; +import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult; +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionManager; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriter; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.io.IOException; +import java.io.InputStream; + +import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.util.LogUtils.logException; + +@NotNullByDefault +class OutgoingHandshakeConnection extends HandshakeConnection + implements Runnable { + + OutgoingHandshakeConnection(KeyManager keyManager, + ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + HandshakeManager handshakeManager, + ContactExchangeManager contactExchangeManager, + ConnectionManager connectionManager, + PendingContactId pendingContactId, + TransportId transportId, DuplexTransportConnection connection) { + super(keyManager, connectionRegistry, streamReaderFactory, + streamWriterFactory, handshakeManager, contactExchangeManager, + connectionManager, pendingContactId, transportId, connection); + } + + @Override + public void run() { + // Allocate the outgoing stream context + StreamContext ctxOut; + try { + ctxOut = keyManager.getStreamContext(pendingContactId, transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + if (ctxOut == null) { + LOG.warning("Could not allocate stream context"); + onError(); + return; + } + // Flush the output stream to send the outgoing stream header + StreamWriter out; + try { + out = streamWriterFactory.createStreamWriter( + writer.getOutputStream(), ctxOut); + out.getOutputStream().flush(); + } catch (IOException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + // Read and recognise the tag + StreamContext ctxIn; + try { + byte[] tag = readTag(reader.getInputStream()); + ctxIn = keyManager.getStreamContext(transportId, tag); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + // Unrecognised tags are suspicious in this case + if (ctxIn == null) { + LOG.warning("Unrecognised tag for returning stream"); + onError(); + return; + } + // Check that the stream comes from the expected pending contact + PendingContactId inPendingContactId = ctxIn.getPendingContactId(); + if (inPendingContactId == null) { + LOG.warning("Expected rendezvous tag, got contact tag"); + onError(); + return; + } + if (!inPendingContactId.equals(pendingContactId)) { + LOG.warning("Wrong pending contact ID for returning stream"); + onError(); + return; + } + // Close the connection if it's redundant + if (!connectionRegistry.registerConnection(pendingContactId)) { + LOG.info("Redundant rendezvous connection"); + onError(); + return; + } + // Handshake and exchange contacts + try { + InputStream in = streamReaderFactory.createStreamReader( + reader.getInputStream(), ctxIn); + HandshakeResult result = + handshakeManager.handshake(pendingContactId, in, out); + Contact contact = contactExchangeManager.exchangeContacts( + pendingContactId, connection, result.getMasterKey(), + result.isAlice(), false); + connectionRegistry.unregisterConnection(pendingContactId, true); + // Reuse the connection as a transport connection + connectionManager.manageOutgoingConnection(contact.getId(), + transportId, connection); + } catch (IOException | DbException e) { + logException(LOG, WARNING, e); + onError(); + connectionRegistry.unregisterConnection(pendingContactId, false); + } + } + + private void onError() { + // 'Recognised' is always true for outgoing connections + onError(true); + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingSimplexSyncConnection.java new file mode 100644 index 000000000..0df604f7c --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/OutgoingSimplexSyncConnection.java @@ -0,0 +1,90 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.SyncSession; +import org.briarproject.bramble.api.sync.SyncSessionFactory; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriter; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.io.IOException; + +import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; +import static org.briarproject.bramble.util.LogUtils.logException; + +@NotNullByDefault +class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { + + private final ContactId contactId; + private final TransportId transportId; + private final TransportConnectionWriter writer; + + OutgoingSimplexSyncConnection(KeyManager keyManager, + ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + SyncSessionFactory syncSessionFactory, + TransportPropertyManager transportPropertyManager, + ContactId contactId, TransportId transportId, + TransportConnectionWriter writer) { + super(keyManager, connectionRegistry, streamReaderFactory, + streamWriterFactory, syncSessionFactory, + transportPropertyManager); + this.contactId = contactId; + this.transportId = transportId; + this.writer = writer; + } + + @Override + public void run() { + // Allocate a stream context + StreamContext ctx; + try { + ctx = keyManager.getStreamContext(contactId, transportId); + } catch (DbException e) { + logException(LOG, WARNING, e); + onError(); + return; + } + if (ctx == null) { + LOG.warning("Could not allocate stream context"); + onError(); + return; + } + connectionRegistry.registerConnection(contactId, transportId, false); + try { + // Create and run the outgoing session + createSimplexOutgoingSession(ctx, writer).run(); + writer.dispose(false); + } catch (IOException e) { + logException(LOG, WARNING, e); + onError(); + } finally { + connectionRegistry.unregisterConnection(contactId, transportId, + false); + } + } + + private void onError() { + disposeOnError(writer); + } + + private SyncSession createSimplexOutgoingSession(StreamContext ctx, + TransportConnectionWriter w) throws IOException { + StreamWriter streamWriter = streamWriterFactory.createStreamWriter( + w.getOutputStream(), ctx); + ContactId c = requireNonNull(ctx.getContactId()); + return syncSessionFactory.createSimplexOutgoingSession(c, + w.getMaxLatency(), streamWriter); + } +} + diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginModule.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginModule.java index bca57cc1e..f60bad880 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginModule.java @@ -36,6 +36,12 @@ public class PluginModule { return connectionManager; } + @Provides + ConnectionFactory provideConnectionFactory( + ConnectionFactoryImpl connectionFactory) { + return connectionFactory; + } + @Provides @Singleton ConnectionRegistry provideConnectionRegistry( diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/SyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/SyncConnection.java new file mode 100644 index 000000000..8e668b814 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/SyncConnection.java @@ -0,0 +1,44 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.SyncSession; +import org.briarproject.bramble.api.sync.SyncSessionFactory; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriterFactory; + +import java.io.IOException; +import java.io.InputStream; + +import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; + +@NotNullByDefault +class SyncConnection extends Connection { + + final SyncSessionFactory syncSessionFactory; + final TransportPropertyManager transportPropertyManager; + + SyncConnection(KeyManager keyManager, ConnectionRegistry connectionRegistry, + StreamReaderFactory streamReaderFactory, + StreamWriterFactory streamWriterFactory, + SyncSessionFactory syncSessionFactory, + TransportPropertyManager transportPropertyManager) { + super(keyManager, connectionRegistry, streamReaderFactory, + streamWriterFactory); + this.syncSessionFactory = syncSessionFactory; + this.transportPropertyManager = transportPropertyManager; + } + + SyncSession createIncomingSession(StreamContext ctx, + TransportConnectionReader r) throws IOException { + InputStream streamReader = streamReaderFactory.createStreamReader( + r.getInputStream(), ctx); + ContactId c = requireNonNull(ctx.getContactId()); + return syncSessionFactory.createIncomingSession(c, streamReader); + } +}