Interrupt the other side of a duplex connection if an exception occurs.

This commit is contained in:
akwizgran
2014-11-05 18:28:05 +00:00
parent 33c3eb7308
commit 5b8eab6035
2 changed files with 218 additions and 132 deletions

View File

@@ -25,8 +25,8 @@ public interface TransportConnectionReader {
* the connection has been marked as closed.
* @param exception true if the connection is being closed because of an
* exception. This may affect how resources are disposed of.
* @param recognised true if the pseudo-random tag was recognised. This may
* affect how resources are disposed of.
* @param recognised true if the connection is definitely a Briar transport
* connection. This may affect how resources are disposed of.
*/
void dispose(boolean exception, boolean recognised) throws IOException;
}

View File

@@ -69,88 +69,18 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
ioExecutor.execute(new DispatchOutgoingDuplexConnection(c, t, d));
}
private StreamContext readAndRecogniseTag(TransportId t,
TransportConnectionReader r) {
private byte[] readTag(TransportId t, TransportConnectionReader r)
throws IOException {
// Read the tag
byte[] tag = new byte[TAG_LENGTH];
try {
InputStream in = r.getInputStream();
int offset = 0;
while(offset < tag.length) {
int read = in.read(tag, offset, tag.length - offset);
if(read == -1) throw new EOFException();
offset += read;
}
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(r, true, false);
return null;
}
// Recognise the tag
StreamContext ctx = null;
try {
ctx = tagRecogniser.recogniseTag(t, tag);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(r, true, false);
return null;
}
if(ctx == null) dispose(r, false, false);
return ctx;
}
private void runAndDispose(StreamContext ctx, TransportConnectionReader r) {
MessagingSession in =
messagingSessionFactory.createIncomingSession(ctx, r);
ContactId contactId = ctx.getContactId();
TransportId transportId = ctx.getTransportId();
connectionRegistry.registerConnection(contactId, transportId);
try {
in.run();
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(r, true, true);
return;
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
dispose(r, false, true);
}
private void dispose(TransportConnectionReader r, boolean exception,
boolean recognised) {
try {
r.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void runAndDispose(StreamContext ctx, TransportConnectionWriter w,
boolean duplex) {
MessagingSession out =
messagingSessionFactory.createOutgoingSession(ctx, w, duplex);
ContactId contactId = ctx.getContactId();
TransportId transportId = ctx.getTransportId();
connectionRegistry.registerConnection(contactId, transportId);
try {
out.run();
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(w, true);
return;
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
dispose(w, false);
}
private void dispose(TransportConnectionWriter w, boolean exception) {
try {
w.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
InputStream in = r.getInputStream();
int offset = 0;
while(offset < tag.length) {
int read = in.read(tag, offset, tag.length - offset);
if(read == -1) throw new EOFException();
offset += read;
}
return tag;
}
private class DispatchIncomingSimplexConnection implements Runnable {
@@ -166,10 +96,46 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
public void run() {
// Read and recognise the tag
StreamContext ctx = readAndRecogniseTag(transportId, reader);
if(ctx == null) return;
StreamContext ctx;
try {
byte[] tag = readTag(transportId, reader);
ctx = tagRecogniser.recogniseTag(transportId, tag);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
return;
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
return;
}
if(ctx == null) {
LOG.info("Unrecognised tag");
disposeReader(true, false);
return;
}
ContactId contactId = ctx.getContactId();
connectionRegistry.registerConnection(contactId, transportId);
// Run the incoming session
runAndDispose(ctx, reader);
MessagingSession incomingSession =
messagingSessionFactory.createIncomingSession(ctx, reader);
try {
incomingSession.run();
disposeReader(false, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
}
private void disposeReader(boolean exception, boolean recognised) {
try {
reader.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
@@ -191,11 +157,32 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
StreamContext ctx = keyManager.getStreamContext(contactId,
transportId);
if(ctx == null) {
dispose(writer, false);
LOG.warning("Could not allocate stream context");
disposeWriter(true);
return;
}
connectionRegistry.registerConnection(contactId, transportId);
// Run the outgoing session
runAndDispose(ctx, writer, false);
MessagingSession outgoingSession =
messagingSessionFactory.createOutgoingSession(ctx,
writer, false);
try {
outgoingSession.run();
disposeWriter(false);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
}
private void disposeWriter(boolean exception) {
try {
writer.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
@@ -205,6 +192,10 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private volatile ContactId contactId = null;
private volatile MessagingSession incomingSession = null;
private volatile MessagingSession outgoingSession = null;
private DispatchIncomingDuplexConnection(TransportId transportId,
DuplexTransportConnection transport) {
this.transportId = transportId;
@@ -214,39 +205,85 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
public void run() {
// Read and recognise the tag
StreamContext ctx = readAndRecogniseTag(transportId, reader);
if(ctx == null) return;
StreamContext ctx;
try {
byte[] tag = readTag(transportId, reader);
ctx = tagRecogniser.recogniseTag(transportId, tag);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
return;
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
return;
}
if(ctx == null) {
LOG.info("Unrecognised tag");
disposeReader(true, false);
return;
}
contactId = ctx.getContactId();
connectionRegistry.registerConnection(contactId, transportId);
// Start the outgoing session on another thread
ioExecutor.execute(new DispatchIncomingDuplexConnectionSide2(
ctx.getContactId(), transportId, writer));
ioExecutor.execute(new Runnable() {
public void run() {
runOutgoingSession();
}
});
// Run the incoming session
runAndDispose(ctx, reader);
}
}
private class DispatchIncomingDuplexConnectionSide2 implements Runnable {
private final ContactId contactId;
private final TransportId transportId;
private final TransportConnectionWriter writer;
private DispatchIncomingDuplexConnectionSide2(ContactId contactId,
TransportId transportId, TransportConnectionWriter writer) {
this.contactId = contactId;
this.transportId = transportId;
this.writer = writer;
incomingSession = messagingSessionFactory.createIncomingSession(ctx,
reader);
try {
incomingSession.run();
disposeReader(false, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
}
public void run() {
private void runOutgoingSession() {
// Allocate a stream context
StreamContext ctx = keyManager.getStreamContext(contactId,
transportId);
if(ctx == null) {
dispose(writer, false);
LOG.warning("Could not allocate stream context");
disposeWriter(true);
return;
}
// Run the outgoing session
runAndDispose(ctx, writer, true);
outgoingSession = messagingSessionFactory.createOutgoingSession(ctx,
writer, true);
try {
outgoingSession.run();
disposeWriter(false);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
}
}
private void disposeReader(boolean exception, boolean recognised) {
if(exception && outgoingSession != null)
outgoingSession.interrupt();
try {
reader.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void disposeWriter(boolean exception) {
if(exception && incomingSession != null)
incomingSession.interrupt();
try {
writer.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
@@ -257,6 +294,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private volatile MessagingSession incomingSession = null;
private volatile MessagingSession outgoingSession = null;
private DispatchOutgoingDuplexConnection(ContactId contactId,
TransportId transportId, DuplexTransportConnection transport) {
this.contactId = contactId;
@@ -270,42 +310,88 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
StreamContext ctx = keyManager.getStreamContext(contactId,
transportId);
if(ctx == null) {
dispose(writer, false);
LOG.warning("Could not allocate stream context");
disposeWriter(true);
return;
}
connectionRegistry.registerConnection(contactId, transportId);
// Start the incoming session on another thread
ioExecutor.execute(new DispatchOutgoingDuplexConnectionSide2(
contactId, transportId, reader));
ioExecutor.execute(new Runnable() {
public void run() {
runIncomingSession();
}
});
// Run the outgoing session
runAndDispose(ctx, writer, true);
}
}
private class DispatchOutgoingDuplexConnectionSide2 implements Runnable {
private final ContactId contactId;
private final TransportId transportId;
private final TransportConnectionReader reader;
private DispatchOutgoingDuplexConnectionSide2(ContactId contactId,
TransportId transportId, TransportConnectionReader reader) {
this.contactId = contactId;
this.transportId = transportId;
this.reader = reader;
outgoingSession = messagingSessionFactory.createOutgoingSession(ctx,
writer, true);
try {
outgoingSession.run();
disposeWriter(false);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
}
public void run() {
private void runIncomingSession() {
// Read and recognise the tag
StreamContext ctx = readAndRecogniseTag(transportId, reader);
if(ctx == null) return;
StreamContext ctx;
try {
byte[] tag = readTag(transportId, reader);
ctx = tagRecogniser.recogniseTag(transportId, tag);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
return;
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
return;
}
// Unrecognised tags are suspicious in this case
if(ctx == null) {
LOG.warning("Unrecognised tag for returning stream");
disposeReader(true, true);
return;
}
// Check that the stream comes from the expected contact
if(!ctx.getContactId().equals(contactId)) {
LOG.warning("Wrong contact ID for duplex connection");
dispose(reader, true, true);
LOG.warning("Wrong contact ID for returning stream");
disposeReader(true, true);
return;
}
// Run the incoming session
runAndDispose(ctx, reader);
incomingSession = messagingSessionFactory.createIncomingSession(ctx,
reader);
try {
incomingSession.run();
disposeReader(false, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
}
}
private void disposeReader(boolean exception, boolean recognised) {
if(exception && outgoingSession != null)
outgoingSession.interrupt();
try {
reader.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void disposeWriter(boolean exception) {
if(exception && incomingSession != null)
incomingSession.interrupt();
try {
writer.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
}