diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index eab0d11e2..b2df9f3bc 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -266,24 +266,24 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(transportId, tag); } catch (IOException | DbException e) { logException(LOG, WARNING, e); - onError(false); + onReadError(false); return; } if (ctx == null) { LOG.info("Unrecognised tag"); - onError(false); + onReadError(false); return; } ContactId contactId = ctx.getContactId(); if (contactId == null) { LOG.warning("Expected contact tag, got rendezvous tag"); - onError(true); + onReadError(true); return; } if (ctx.isHandshakeMode()) { // TODO: Support handshake mode for contacts LOG.warning("Received handshake tag, expected rotation mode"); - onError(true); + onReadError(true); return; } connectionRegistry.registerConnection(contactId, transportId, true); @@ -298,7 +298,7 @@ class ConnectionManagerImpl implements ConnectionManager { if (out != null) out.interrupt(); } catch (IOException e) { logException(LOG, WARNING, e); - onError(true); + onReadError(true); } finally { connectionRegistry.unregisterConnection(contactId, transportId, true); @@ -312,12 +312,12 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(contactId, transportId); } catch (DbException e) { logException(LOG, WARNING, e); - onError(true); + onWriteError(); return; } if (ctx == null) { LOG.warning("Could not allocate stream context"); - onError(true); + onWriteError(); return; } try { @@ -328,13 +328,21 @@ class ConnectionManagerImpl implements ConnectionManager { writer.dispose(false); } catch (IOException e) { logException(LOG, WARNING, e); - onError(true); + onWriteError(); } } - private void onError(boolean recognised) { + private void onReadError(boolean recognised) { disposeOnError(reader, recognised); disposeOnError(writer); + // Interrupt the outgoing session so it finishes + SyncSession out = outgoingSession; + if (out != null) out.interrupt(); + } + + private void onWriteError() { + disposeOnError(reader, true); + disposeOnError(writer); } } @@ -364,18 +372,18 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(contactId, transportId); } catch (DbException e) { logException(LOG, WARNING, e); - onError(); + onWriteError(); return; } if (ctx == null) { LOG.warning("Could not allocate stream context"); - onError(); + onWriteError(); return; } if (ctx.isHandshakeMode()) { // TODO: Support handshake mode for contacts LOG.warning("Cannot use handshake mode stream context"); - onError(); + onWriteError(); return; } // Start the incoming session on another thread @@ -388,7 +396,7 @@ class ConnectionManagerImpl implements ConnectionManager { writer.dispose(false); } catch (IOException e) { logException(LOG, WARNING, e); - onError(); + onWriteError(); } } @@ -400,31 +408,31 @@ class ConnectionManagerImpl implements ConnectionManager { ctx = keyManager.getStreamContext(transportId, tag); } catch (IOException | DbException e) { logException(LOG, WARNING, e); - onError(); + onReadError(); return; } // Unrecognised tags are suspicious in this case if (ctx == null) { LOG.warning("Unrecognised tag for returning stream"); - onError(); + onReadError(); return; } // Check that the stream comes from the expected contact ContactId inContactId = ctx.getContactId(); if (inContactId == null) { LOG.warning("Expected contact tag, got rendezvous tag"); - onError(); + onReadError(); return; } if (!contactId.equals(inContactId)) { LOG.warning("Wrong contact ID for returning stream"); - onError(); + onReadError(); return; } if (ctx.isHandshakeMode()) { // TODO: Support handshake mode for contacts LOG.warning("Received handshake tag, expected rotation mode"); - onError(); + onReadError(); return; } connectionRegistry.registerConnection(contactId, transportId, @@ -435,20 +443,29 @@ class ConnectionManagerImpl implements ConnectionManager { reader.dispose(false, true); // Interrupt the outgoing session so it finishes cleanly SyncSession out = outgoingSession; + outgoingSession = null; if (out != null) out.interrupt(); } catch (IOException e) { logException(LOG, WARNING, e); - onError(); + onReadError(); } finally { connectionRegistry.unregisterConnection(contactId, transportId, false); } } - private void onError() { + private void onReadError() { // 'Recognised' is always true for outgoing connections disposeOnError(reader, true); disposeOnError(writer); + // Interrupt the outgoing session so it finishes + SyncSession out = outgoingSession; + if (out != null) out.interrupt(); + } + + private void onWriteError() { + disposeOnError(reader, true); + disposeOnError(writer); } } }