Ensure connections are closed when contact is removed.

This commit is contained in:
akwizgran
2018-02-27 17:18:29 +00:00
parent 692db742cf
commit 53b38d83e8

View File

@@ -247,12 +247,12 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
dispose(true, false);
return;
}
if (ctx == null) {
LOG.info("Unrecognised tag");
disposeReader(false, false);
dispose(false, false);
return;
}
contactId = ctx.getContactId();
@@ -263,10 +263,10 @@ class ConnectionManagerImpl implements ConnectionManager {
// Create and run the incoming session
incomingSession = createIncomingSession(ctx, reader);
incomingSession.run();
disposeReader(false, true);
dispose(false, true);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
dispose(true, true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
true);
@@ -280,39 +280,28 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(contactId, transportId);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
dispose(true, true);
return;
}
if (ctx == null) {
LOG.warning("Could not allocate stream context");
disposeWriter(true);
dispose(true, true);
return;
}
try {
// Create and run the outgoing session
outgoingSession = createDuplexOutgoingSession(ctx, writer);
outgoingSession.run();
disposeWriter(false);
dispose(false, true);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
dispose(true, true);
}
}
private void disposeReader(boolean exception, boolean recognised) {
if (exception && outgoingSession != null)
outgoingSession.interrupt();
private void dispose(boolean exception, boolean recognised) {
try {
reader.dispose(exception, recognised);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void disposeWriter(boolean exception) {
if (exception && incomingSession != null)
incomingSession.interrupt();
try {
writer.dispose(exception);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -346,12 +335,12 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(contactId, transportId);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
dispose(true);
return;
}
if (ctx == null) {
LOG.warning("Could not allocate stream context");
disposeWriter(true);
dispose(true);
return;
}
// Start the incoming session on another thread
@@ -360,10 +349,10 @@ class ConnectionManagerImpl implements ConnectionManager {
// Create and run the outgoing session
outgoingSession = createDuplexOutgoingSession(ctx, writer);
outgoingSession.run();
disposeWriter(false);
dispose(false);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
dispose(true);
}
}
@@ -375,19 +364,19 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
dispose(true);
return;
}
// Unrecognised tags are suspicious in this case
if (ctx == null) {
LOG.warning("Unrecognised tag for returning stream");
disposeReader(true, false);
dispose(true);
return;
}
// Check that the stream comes from the expected contact
if (!ctx.getContactId().equals(contactId)) {
LOG.warning("Wrong contact ID for returning stream");
disposeReader(true, true);
dispose(true);
return;
}
connectionRegistry.registerConnection(contactId, transportId,
@@ -396,30 +385,20 @@ class ConnectionManagerImpl implements ConnectionManager {
// Create and run the incoming session
incomingSession = createIncomingSession(ctx, reader);
incomingSession.run();
disposeReader(false, true);
dispose(false);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
dispose(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
false);
}
}
private void disposeReader(boolean exception, boolean recognised) {
if (exception && outgoingSession != null)
outgoingSession.interrupt();
try {
reader.dispose(exception, recognised);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void disposeWriter(boolean exception) {
if (exception && incomingSession != null)
incomingSession.interrupt();
private void dispose(boolean exception) {
try {
// 'Recognised' is always true because we opened the connection
reader.dispose(exception, true);
writer.dispose(exception);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);