Interrupt outgoing session on read error.

This commit is contained in:
akwizgran
2019-05-28 10:44:35 +01:00
parent 717b2d176e
commit aa0c3118a0

View File

@@ -266,24 +266,24 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(transportId, tag); ctx = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) { } catch (IOException | DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onError(false); onReadError(false);
return; return;
} }
if (ctx == null) { if (ctx == null) {
LOG.info("Unrecognised tag"); LOG.info("Unrecognised tag");
onError(false); onReadError(false);
return; return;
} }
ContactId contactId = ctx.getContactId(); ContactId contactId = ctx.getContactId();
if (contactId == null) { if (contactId == null) {
LOG.warning("Expected contact tag, got rendezvous tag"); LOG.warning("Expected contact tag, got rendezvous tag");
onError(true); onReadError(true);
return; return;
} }
if (ctx.isHandshakeMode()) { if (ctx.isHandshakeMode()) {
// TODO: Support handshake mode for contacts // TODO: Support handshake mode for contacts
LOG.warning("Received handshake tag, expected rotation mode"); LOG.warning("Received handshake tag, expected rotation mode");
onError(true); onReadError(true);
return; return;
} }
connectionRegistry.registerConnection(contactId, transportId, true); connectionRegistry.registerConnection(contactId, transportId, true);
@@ -298,7 +298,7 @@ class ConnectionManagerImpl implements ConnectionManager {
if (out != null) out.interrupt(); if (out != null) out.interrupt();
} catch (IOException e) { } catch (IOException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onError(true); onReadError(true);
} finally { } finally {
connectionRegistry.unregisterConnection(contactId, transportId, connectionRegistry.unregisterConnection(contactId, transportId,
true); true);
@@ -312,12 +312,12 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(contactId, transportId); ctx = keyManager.getStreamContext(contactId, transportId);
} catch (DbException e) { } catch (DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onError(true); onWriteError();
return; return;
} }
if (ctx == null) { if (ctx == null) {
LOG.warning("Could not allocate stream context"); LOG.warning("Could not allocate stream context");
onError(true); onWriteError();
return; return;
} }
try { try {
@@ -328,13 +328,21 @@ class ConnectionManagerImpl implements ConnectionManager {
writer.dispose(false); writer.dispose(false);
} catch (IOException e) { } catch (IOException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onError(true); onWriteError();
} }
} }
private void onError(boolean recognised) { private void onReadError(boolean recognised) {
disposeOnError(reader, recognised); disposeOnError(reader, recognised);
disposeOnError(writer); disposeOnError(writer);
// Interrupt the outgoing session so it finishes
SyncSession out = outgoingSession;
if (out != null) out.interrupt();
}
private void onWriteError() {
disposeOnError(reader, true);
disposeOnError(writer);
} }
} }
@@ -364,18 +372,18 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(contactId, transportId); ctx = keyManager.getStreamContext(contactId, transportId);
} catch (DbException e) { } catch (DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onError(); onWriteError();
return; return;
} }
if (ctx == null) { if (ctx == null) {
LOG.warning("Could not allocate stream context"); LOG.warning("Could not allocate stream context");
onError(); onWriteError();
return; return;
} }
if (ctx.isHandshakeMode()) { if (ctx.isHandshakeMode()) {
// TODO: Support handshake mode for contacts // TODO: Support handshake mode for contacts
LOG.warning("Cannot use handshake mode stream context"); LOG.warning("Cannot use handshake mode stream context");
onError(); onWriteError();
return; return;
} }
// Start the incoming session on another thread // Start the incoming session on another thread
@@ -388,7 +396,7 @@ class ConnectionManagerImpl implements ConnectionManager {
writer.dispose(false); writer.dispose(false);
} catch (IOException e) { } catch (IOException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onError(); onWriteError();
} }
} }
@@ -400,31 +408,31 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(transportId, tag); ctx = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) { } catch (IOException | DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onError(); onReadError();
return; return;
} }
// Unrecognised tags are suspicious in this case // Unrecognised tags are suspicious in this case
if (ctx == null) { if (ctx == null) {
LOG.warning("Unrecognised tag for returning stream"); LOG.warning("Unrecognised tag for returning stream");
onError(); onReadError();
return; return;
} }
// Check that the stream comes from the expected contact // Check that the stream comes from the expected contact
ContactId inContactId = ctx.getContactId(); ContactId inContactId = ctx.getContactId();
if (inContactId == null) { if (inContactId == null) {
LOG.warning("Expected contact tag, got rendezvous tag"); LOG.warning("Expected contact tag, got rendezvous tag");
onError(); onReadError();
return; return;
} }
if (!contactId.equals(inContactId)) { if (!contactId.equals(inContactId)) {
LOG.warning("Wrong contact ID for returning stream"); LOG.warning("Wrong contact ID for returning stream");
onError(); onReadError();
return; return;
} }
if (ctx.isHandshakeMode()) { if (ctx.isHandshakeMode()) {
// TODO: Support handshake mode for contacts // TODO: Support handshake mode for contacts
LOG.warning("Received handshake tag, expected rotation mode"); LOG.warning("Received handshake tag, expected rotation mode");
onError(); onReadError();
return; return;
} }
connectionRegistry.registerConnection(contactId, transportId, connectionRegistry.registerConnection(contactId, transportId,
@@ -435,20 +443,29 @@ class ConnectionManagerImpl implements ConnectionManager {
reader.dispose(false, true); reader.dispose(false, true);
// Interrupt the outgoing session so it finishes cleanly // Interrupt the outgoing session so it finishes cleanly
SyncSession out = outgoingSession; SyncSession out = outgoingSession;
outgoingSession = null;
if (out != null) out.interrupt(); if (out != null) out.interrupt();
} catch (IOException e) { } catch (IOException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onError(); onReadError();
} finally { } finally {
connectionRegistry.unregisterConnection(contactId, transportId, connectionRegistry.unregisterConnection(contactId, transportId,
false); false);
} }
} }
private void onError() { private void onReadError() {
// 'Recognised' is always true for outgoing connections // 'Recognised' is always true for outgoing connections
disposeOnError(reader, true); disposeOnError(reader, true);
disposeOnError(writer); disposeOnError(writer);
// Interrupt the outgoing session so it finishes
SyncSession out = outgoingSession;
if (out != null) out.interrupt();
}
private void onWriteError() {
disposeOnError(reader, true);
disposeOnError(writer);
} }
} }
} }