From 717b2d176ec6620d43bfe62473fa2bebbde4d5f1 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Fri, 24 May 2019 18:08:06 +0100 Subject: [PATCH 1/3] Clean up connection manager, ready for pending contacts. --- .../bramble/plugin/ConnectionManagerImpl.java | 262 ++++++++++-------- 1 file changed, 139 insertions(+), 123 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index baab0ccc2..eab0d11e2 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -3,6 +3,7 @@ package org.briarproject.bramble.plugin; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.lifecycle.IoExecutor; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.ConnectionManager; import org.briarproject.bramble.api.plugin.ConnectionRegistry; import org.briarproject.bramble.api.plugin.TransportConnectionReader; @@ -17,22 +18,26 @@ import org.briarproject.bramble.api.transport.StreamReaderFactory; import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriterFactory; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.concurrent.Executor; import java.util.logging.Logger; +import javax.annotation.Nullable; import javax.inject.Inject; import static java.util.logging.Level.WARNING; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH; +import static org.briarproject.bramble.util.IoUtils.read; import static org.briarproject.bramble.util.LogUtils.logException; +@NotNullByDefault class ConnectionManagerImpl implements ConnectionManager { private static final Logger LOG = - Logger.getLogger(ConnectionManagerImpl.class.getName()); + getLogger(ConnectionManagerImpl.class.getName()); private final Executor ioExecutor; private final KeyManager keyManager; @@ -79,16 +84,9 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new ManageOutgoingDuplexConnection(c, t, d)); } - private byte[] readTag(TransportConnectionReader r) throws IOException { - // Read the tag + private byte[] readTag(InputStream in) throws IOException { byte[] tag = new byte[TAG_LENGTH]; - InputStream in = r.getInputStream(); - int offset = 0; - while (offset < tag.length) { - int read = in.read(tag, offset, tag.length - offset); - if (read == -1) throw new EOFException(); - offset += read; - } + read(in, tag); return tag; } @@ -96,28 +94,43 @@ class ConnectionManagerImpl implements ConnectionManager { TransportConnectionReader r) throws IOException { InputStream streamReader = streamReaderFactory.createStreamReader( r.getInputStream(), ctx); - // TODO: Pending contacts, handshake mode - return syncSessionFactory.createIncomingSession(ctx.getContactId(), - streamReader); + ContactId c = requireNonNull(ctx.getContactId()); + return syncSessionFactory.createIncomingSession(c, streamReader); } private SyncSession createSimplexOutgoingSession(StreamContext ctx, TransportConnectionWriter w) throws IOException { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); - // TODO: Pending contacts, handshake mode - return syncSessionFactory.createSimplexOutgoingSession( - ctx.getContactId(), w.getMaxLatency(), streamWriter); + ContactId c = requireNonNull(ctx.getContactId()); + return syncSessionFactory.createSimplexOutgoingSession(c, + w.getMaxLatency(), streamWriter); } private SyncSession createDuplexOutgoingSession(StreamContext ctx, TransportConnectionWriter w) throws IOException { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); - // TODO: Pending contacts, handshake mode - return syncSessionFactory.createDuplexOutgoingSession( - ctx.getContactId(), w.getMaxLatency(), w.getMaxIdleTime(), - streamWriter); + ContactId c = requireNonNull(ctx.getContactId()); + return syncSessionFactory.createDuplexOutgoingSession(c, + w.getMaxLatency(), w.getMaxIdleTime(), streamWriter); + } + + private void disposeOnError(TransportConnectionReader reader, + boolean recognised) { + try { + reader.dispose(true, recognised); + } catch (IOException e) { + logException(LOG, WARNING, e); + } + } + + private void disposeOnError(TransportConnectionWriter writer) { + try { + writer.dispose(true); + } catch (IOException e) { + logException(LOG, WARNING, e); + } } private class ManageIncomingSimplexConnection implements Runnable { @@ -136,40 +149,46 @@ class ConnectionManagerImpl implements ConnectionManager { // Read and recognise the tag StreamContext ctx; try { - byte[] tag = readTag(reader); + byte[] tag = readTag(reader.getInputStream()); ctx = keyManager.getStreamContext(transportId, tag); } catch (IOException | DbException e) { logException(LOG, WARNING, e); - disposeReader(true, false); + onError(false); return; } if (ctx == null) { LOG.info("Unrecognised tag"); - disposeReader(false, false); + onError(false); return; } - // TODO: Pending contacts ContactId contactId = ctx.getContactId(); + if (contactId == null) { + LOG.warning("Received rendezvous stream, expected contact"); + onError(true); + return; + } + if (ctx.isHandshakeMode()) { + // TODO: Support handshake mode for contacts + LOG.warning("Received handshake tag, expected rotation mode"); + onError(true); + return; + } connectionRegistry.registerConnection(contactId, transportId, true); try { // Create and run the incoming session createIncomingSession(ctx, reader).run(); - disposeReader(false, true); + reader.dispose(false, true); } catch (IOException e) { logException(LOG, WARNING, e); - disposeReader(true, true); + onError(true); } finally { connectionRegistry.unregisterConnection(contactId, transportId, true); } } - private void disposeReader(boolean exception, boolean recognised) { - try { - reader.dispose(exception, recognised); - } catch (IOException e) { - logException(LOG, WARNING, e); - } + private void onError(boolean recognised) { + disposeOnError(reader, recognised); } } @@ -194,12 +213,12 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(contactId, transportId); } catch (DbException e) { logException(LOG, WARNING, e); - disposeWriter(true); + onError(); return; } if (ctx == null) { LOG.warning("Could not allocate stream context"); - disposeWriter(true); + onError(); return; } connectionRegistry.registerConnection(contactId, transportId, @@ -207,22 +226,18 @@ class ConnectionManagerImpl implements ConnectionManager { try { // Create and run the outgoing session createSimplexOutgoingSession(ctx, writer).run(); - disposeWriter(false); + writer.dispose(false); } catch (IOException e) { logException(LOG, WARNING, e); - disposeWriter(true); + onError(); } finally { connectionRegistry.unregisterConnection(contactId, transportId, false); } } - private void disposeWriter(boolean exception) { - try { - writer.dispose(exception); - } catch (IOException e) { - logException(LOG, WARNING, e); - } + private void onError() { + disposeOnError(writer); } } @@ -232,15 +247,14 @@ class ConnectionManagerImpl implements ConnectionManager { private final TransportConnectionReader reader; private final TransportConnectionWriter writer; - private volatile ContactId contactId = null; - private volatile SyncSession incomingSession = null; + @Nullable private volatile SyncSession outgoingSession = null; private ManageIncomingDuplexConnection(TransportId transportId, - DuplexTransportConnection transport) { + DuplexTransportConnection connection) { this.transportId = transportId; - reader = transport.getReader(); - writer = transport.getWriter(); + reader = connection.getReader(); + writer = connection.getWriter(); } @Override @@ -248,82 +262,79 @@ class ConnectionManagerImpl implements ConnectionManager { // Read and recognise the tag StreamContext ctx; try { - byte[] tag = readTag(reader); + byte[] tag = readTag(reader.getInputStream()); ctx = keyManager.getStreamContext(transportId, tag); } catch (IOException | DbException e) { logException(LOG, WARNING, e); - disposeReader(true, false); + onError(false); return; } if (ctx == null) { LOG.info("Unrecognised tag"); - disposeReader(false, false); + onError(false); + return; + } + ContactId contactId = ctx.getContactId(); + if (contactId == null) { + LOG.warning("Expected contact tag, got rendezvous tag"); + onError(true); + return; + } + if (ctx.isHandshakeMode()) { + // TODO: Support handshake mode for contacts + LOG.warning("Received handshake tag, expected rotation mode"); + onError(true); return; } - contactId = ctx.getContactId(); connectionRegistry.registerConnection(contactId, transportId, true); // Start the outgoing session on another thread - ioExecutor.execute(this::runOutgoingSession); + ioExecutor.execute(() -> runOutgoingSession(contactId)); try { // Create and run the incoming session - incomingSession = createIncomingSession(ctx, reader); - incomingSession.run(); - disposeReader(false, true); + createIncomingSession(ctx, reader).run(); + reader.dispose(false, true); + // Interrupt the outgoing session so it finishes cleanly + SyncSession out = outgoingSession; + if (out != null) out.interrupt(); } catch (IOException e) { logException(LOG, WARNING, e); - disposeReader(true, true); + onError(true); } finally { connectionRegistry.unregisterConnection(contactId, transportId, true); } } - private void runOutgoingSession() { + private void runOutgoingSession(ContactId contactId) { // Allocate a stream context StreamContext ctx; try { ctx = keyManager.getStreamContext(contactId, transportId); } catch (DbException e) { logException(LOG, WARNING, e); - disposeWriter(true); + onError(true); return; } if (ctx == null) { LOG.warning("Could not allocate stream context"); - disposeWriter(true); + onError(true); return; } try { // Create and run the outgoing session - outgoingSession = createDuplexOutgoingSession(ctx, writer); - outgoingSession.run(); - disposeWriter(false); + SyncSession out = createDuplexOutgoingSession(ctx, writer); + outgoingSession = out; + out.run(); + writer.dispose(false); } catch (IOException e) { logException(LOG, WARNING, e); - disposeWriter(true); + onError(true); } } - private void disposeReader(boolean exception, boolean recognised) { - // Interrupt the outgoing session so it finishes cleanly - if (outgoingSession != null) outgoingSession.interrupt(); - try { - reader.dispose(exception, recognised); - } catch (IOException e) { - logException(LOG, WARNING, e); - } - } - - private void disposeWriter(boolean exception) { - // Interrupt the incoming session if an exception occurred, - // otherwise wait for the end of stream marker - if (exception && incomingSession != null) - incomingSession.interrupt(); - try { - writer.dispose(exception); - } catch (IOException e) { - logException(LOG, WARNING, e); - } + private void onError(boolean recognised) { + disposeOnError(reader, recognised); + disposeOnError(writer); } } @@ -334,15 +345,15 @@ class ConnectionManagerImpl implements ConnectionManager { private final TransportConnectionReader reader; private final TransportConnectionWriter writer; - private volatile SyncSession incomingSession = null; + @Nullable private volatile SyncSession outgoingSession = null; private ManageOutgoingDuplexConnection(ContactId contactId, - TransportId transportId, DuplexTransportConnection transport) { + TransportId transportId, DuplexTransportConnection connection) { this.contactId = contactId; this.transportId = transportId; - reader = transport.getReader(); - writer = transport.getWriter(); + reader = connection.getReader(); + writer = connection.getWriter(); } @Override @@ -353,24 +364,31 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(contactId, transportId); } catch (DbException e) { logException(LOG, WARNING, e); - disposeWriter(true); + onError(); return; } if (ctx == null) { LOG.warning("Could not allocate stream context"); - disposeWriter(true); + onError(); + return; + } + if (ctx.isHandshakeMode()) { + // TODO: Support handshake mode for contacts + LOG.warning("Cannot use handshake mode stream context"); + onError(); return; } // Start the incoming session on another thread ioExecutor.execute(this::runIncomingSession); try { // Create and run the outgoing session - outgoingSession = createDuplexOutgoingSession(ctx, writer); - outgoingSession.run(); - disposeWriter(false); + SyncSession out = createDuplexOutgoingSession(ctx, writer); + outgoingSession = out; + out.run(); + writer.dispose(false); } catch (IOException e) { logException(LOG, WARNING, e); - disposeWriter(true); + onError(); } } @@ -378,61 +396,59 @@ class ConnectionManagerImpl implements ConnectionManager { // Read and recognise the tag StreamContext ctx; try { - byte[] tag = readTag(reader); + byte[] tag = readTag(reader.getInputStream()); ctx = keyManager.getStreamContext(transportId, tag); } catch (IOException | DbException e) { logException(LOG, WARNING, e); - disposeReader(true, false); + onError(); return; } // Unrecognised tags are suspicious in this case if (ctx == null) { LOG.warning("Unrecognised tag for returning stream"); - disposeReader(true, false); + onError(); return; } // Check that the stream comes from the expected contact - if (!contactId.equals(ctx.getContactId())) { + ContactId inContactId = ctx.getContactId(); + if (inContactId == null) { + LOG.warning("Expected contact tag, got rendezvous tag"); + onError(); + return; + } + if (!contactId.equals(inContactId)) { LOG.warning("Wrong contact ID for returning stream"); - disposeReader(true, true); + onError(); + return; + } + if (ctx.isHandshakeMode()) { + // TODO: Support handshake mode for contacts + LOG.warning("Received handshake tag, expected rotation mode"); + onError(); return; } connectionRegistry.registerConnection(contactId, transportId, false); try { // Create and run the incoming session - incomingSession = createIncomingSession(ctx, reader); - incomingSession.run(); - disposeReader(false, true); + createIncomingSession(ctx, reader).run(); + reader.dispose(false, true); + // Interrupt the outgoing session so it finishes cleanly + SyncSession out = outgoingSession; + if (out != null) out.interrupt(); } catch (IOException e) { logException(LOG, WARNING, e); - disposeReader(true, true); + onError(); } finally { connectionRegistry.unregisterConnection(contactId, transportId, false); } } - private void disposeReader(boolean exception, boolean recognised) { - // Interrupt the outgoing session so it finishes cleanly - if (outgoingSession != null) outgoingSession.interrupt(); - try { - reader.dispose(exception, recognised); - } catch (IOException e) { - logException(LOG, WARNING, e); - } - } - - private void disposeWriter(boolean exception) { - // Interrupt the incoming session if an exception occurred, - // otherwise wait for the end of stream marker - if (exception && incomingSession != null) - incomingSession.interrupt(); - try { - writer.dispose(exception); - } catch (IOException e) { - logException(LOG, WARNING, e); - } + private void onError() { + // 'Recognised' is always true for outgoing connections + disposeOnError(reader, true); + disposeOnError(writer); } } } From aa0c3118a009028df235bac31088e7403e6cb805 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Tue, 28 May 2019 10:44:35 +0100 Subject: [PATCH 2/3] Interrupt outgoing session on read error. --- .../bramble/plugin/ConnectionManagerImpl.java | 57 ++++++++++++------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index eab0d11e2..b2df9f3bc 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -266,24 +266,24 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(transportId, tag); } catch (IOException | DbException e) { logException(LOG, WARNING, e); - onError(false); + onReadError(false); return; } if (ctx == null) { LOG.info("Unrecognised tag"); - onError(false); + onReadError(false); return; } ContactId contactId = ctx.getContactId(); if (contactId == null) { LOG.warning("Expected contact tag, got rendezvous tag"); - onError(true); + onReadError(true); return; } if (ctx.isHandshakeMode()) { // TODO: Support handshake mode for contacts LOG.warning("Received handshake tag, expected rotation mode"); - onError(true); + onReadError(true); return; } connectionRegistry.registerConnection(contactId, transportId, true); @@ -298,7 +298,7 @@ class ConnectionManagerImpl implements ConnectionManager { if (out != null) out.interrupt(); } catch (IOException e) { logException(LOG, WARNING, e); - onError(true); + onReadError(true); } finally { connectionRegistry.unregisterConnection(contactId, transportId, true); @@ -312,12 +312,12 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(contactId, transportId); } catch (DbException e) { logException(LOG, WARNING, e); - onError(true); + onWriteError(); return; } if (ctx == null) { LOG.warning("Could not allocate stream context"); - onError(true); + onWriteError(); return; } try { @@ -328,13 +328,21 @@ class ConnectionManagerImpl implements ConnectionManager { writer.dispose(false); } catch (IOException e) { logException(LOG, WARNING, e); - onError(true); + onWriteError(); } } - private void onError(boolean recognised) { + private void onReadError(boolean recognised) { disposeOnError(reader, recognised); disposeOnError(writer); + // Interrupt the outgoing session so it finishes + SyncSession out = outgoingSession; + if (out != null) out.interrupt(); + } + + private void onWriteError() { + disposeOnError(reader, true); + disposeOnError(writer); } } @@ -364,18 +372,18 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(contactId, transportId); } catch (DbException e) { logException(LOG, WARNING, e); - onError(); + onWriteError(); return; } if (ctx == null) { LOG.warning("Could not allocate stream context"); - onError(); + onWriteError(); return; } if (ctx.isHandshakeMode()) { // TODO: Support handshake mode for contacts LOG.warning("Cannot use handshake mode stream context"); - onError(); + onWriteError(); return; } // Start the incoming session on another thread @@ -388,7 +396,7 @@ class ConnectionManagerImpl implements ConnectionManager { writer.dispose(false); } catch (IOException e) { logException(LOG, WARNING, e); - onError(); + onWriteError(); } } @@ -400,31 +408,31 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(transportId, tag); } catch (IOException | DbException e) { logException(LOG, WARNING, e); - onError(); + onReadError(); return; } // Unrecognised tags are suspicious in this case if (ctx == null) { LOG.warning("Unrecognised tag for returning stream"); - onError(); + onReadError(); return; } // Check that the stream comes from the expected contact ContactId inContactId = ctx.getContactId(); if (inContactId == null) { LOG.warning("Expected contact tag, got rendezvous tag"); - onError(); + onReadError(); return; } if (!contactId.equals(inContactId)) { LOG.warning("Wrong contact ID for returning stream"); - onError(); + onReadError(); return; } if (ctx.isHandshakeMode()) { // TODO: Support handshake mode for contacts LOG.warning("Received handshake tag, expected rotation mode"); - onError(); + onReadError(); return; } connectionRegistry.registerConnection(contactId, transportId, @@ -435,20 +443,29 @@ class ConnectionManagerImpl implements ConnectionManager { reader.dispose(false, true); // Interrupt the outgoing session so it finishes cleanly SyncSession out = outgoingSession; + outgoingSession = null; if (out != null) out.interrupt(); } catch (IOException e) { logException(LOG, WARNING, e); - onError(); + onReadError(); } finally { connectionRegistry.unregisterConnection(contactId, transportId, false); } } - private void onError() { + private void onReadError() { // 'Recognised' is always true for outgoing connections disposeOnError(reader, true); disposeOnError(writer); + // Interrupt the outgoing session so it finishes + SyncSession out = outgoingSession; + if (out != null) out.interrupt(); + } + + private void onWriteError() { + disposeOnError(reader, true); + disposeOnError(writer); } } } From 829a6df56760add96ac4a716592e6acc7c7566e7 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Tue, 28 May 2019 14:14:52 +0100 Subject: [PATCH 3/3] Remove redundant assignment. --- .../org/briarproject/bramble/plugin/ConnectionManagerImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index b2df9f3bc..7cece262b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -443,7 +443,6 @@ class ConnectionManagerImpl implements ConnectionManager { reader.dispose(false, true); // Interrupt the outgoing session so it finishes cleanly SyncSession out = outgoingSession; - outgoingSession = null; if (out != null) out.interrupt(); } catch (IOException e) { logException(LOG, WARNING, e);