Merge branch '1571-connection-manager-cleanup' into 'master'

Clean up connection manager, ready for pending contacts

See merge request briar/briar!1109
This commit is contained in:
akwizgran
2019-05-29 15:16:35 +00:00

View File

@@ -3,6 +3,7 @@ package org.briarproject.bramble.plugin;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.ConnectionManager;
import org.briarproject.bramble.api.plugin.ConnectionRegistry;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
@@ -17,22 +18,26 @@ import org.briarproject.bramble.api.transport.StreamReaderFactory;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.api.transport.StreamWriterFactory;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.inject.Inject;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH;
import static org.briarproject.bramble.util.IoUtils.read;
import static org.briarproject.bramble.util.LogUtils.logException;
@NotNullByDefault
class ConnectionManagerImpl implements ConnectionManager {
private static final Logger LOG =
Logger.getLogger(ConnectionManagerImpl.class.getName());
getLogger(ConnectionManagerImpl.class.getName());
private final Executor ioExecutor;
private final KeyManager keyManager;
@@ -79,16 +84,9 @@ class ConnectionManagerImpl implements ConnectionManager {
ioExecutor.execute(new ManageOutgoingDuplexConnection(c, t, d));
}
private byte[] readTag(TransportConnectionReader r) throws IOException {
// Read the tag
private byte[] readTag(InputStream in) throws IOException {
byte[] tag = new byte[TAG_LENGTH];
InputStream in = r.getInputStream();
int offset = 0;
while (offset < tag.length) {
int read = in.read(tag, offset, tag.length - offset);
if (read == -1) throw new EOFException();
offset += read;
}
read(in, tag);
return tag;
}
@@ -96,28 +94,43 @@ class ConnectionManagerImpl implements ConnectionManager {
TransportConnectionReader r) throws IOException {
InputStream streamReader = streamReaderFactory.createStreamReader(
r.getInputStream(), ctx);
// TODO: Pending contacts, handshake mode
return syncSessionFactory.createIncomingSession(ctx.getContactId(),
streamReader);
ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createIncomingSession(c, streamReader);
}
private SyncSession createSimplexOutgoingSession(StreamContext ctx,
TransportConnectionWriter w) throws IOException {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), ctx);
// TODO: Pending contacts, handshake mode
return syncSessionFactory.createSimplexOutgoingSession(
ctx.getContactId(), w.getMaxLatency(), streamWriter);
ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createSimplexOutgoingSession(c,
w.getMaxLatency(), streamWriter);
}
private SyncSession createDuplexOutgoingSession(StreamContext ctx,
TransportConnectionWriter w) throws IOException {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), ctx);
// TODO: Pending contacts, handshake mode
return syncSessionFactory.createDuplexOutgoingSession(
ctx.getContactId(), w.getMaxLatency(), w.getMaxIdleTime(),
streamWriter);
ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createDuplexOutgoingSession(c,
w.getMaxLatency(), w.getMaxIdleTime(), streamWriter);
}
private void disposeOnError(TransportConnectionReader reader,
boolean recognised) {
try {
reader.dispose(true, recognised);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
}
private void disposeOnError(TransportConnectionWriter writer) {
try {
writer.dispose(true);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
}
private class ManageIncomingSimplexConnection implements Runnable {
@@ -136,40 +149,46 @@ class ConnectionManagerImpl implements ConnectionManager {
// Read and recognise the tag
StreamContext ctx;
try {
byte[] tag = readTag(reader);
byte[] tag = readTag(reader.getInputStream());
ctx = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) {
logException(LOG, WARNING, e);
disposeReader(true, false);
onError(false);
return;
}
if (ctx == null) {
LOG.info("Unrecognised tag");
disposeReader(false, false);
onError(false);
return;
}
// TODO: Pending contacts
ContactId contactId = ctx.getContactId();
if (contactId == null) {
LOG.warning("Received rendezvous stream, expected contact");
onError(true);
return;
}
if (ctx.isHandshakeMode()) {
// TODO: Support handshake mode for contacts
LOG.warning("Received handshake tag, expected rotation mode");
onError(true);
return;
}
connectionRegistry.registerConnection(contactId, transportId, true);
try {
// Create and run the incoming session
createIncomingSession(ctx, reader).run();
disposeReader(false, true);
reader.dispose(false, true);
} catch (IOException e) {
logException(LOG, WARNING, e);
disposeReader(true, true);
onError(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
true);
}
}
private void disposeReader(boolean exception, boolean recognised) {
try {
reader.dispose(exception, recognised);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
private void onError(boolean recognised) {
disposeOnError(reader, recognised);
}
}
@@ -194,12 +213,12 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(contactId, transportId);
} catch (DbException e) {
logException(LOG, WARNING, e);
disposeWriter(true);
onError();
return;
}
if (ctx == null) {
LOG.warning("Could not allocate stream context");
disposeWriter(true);
onError();
return;
}
connectionRegistry.registerConnection(contactId, transportId,
@@ -207,22 +226,18 @@ class ConnectionManagerImpl implements ConnectionManager {
try {
// Create and run the outgoing session
createSimplexOutgoingSession(ctx, writer).run();
disposeWriter(false);
writer.dispose(false);
} catch (IOException e) {
logException(LOG, WARNING, e);
disposeWriter(true);
onError();
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
false);
}
}
private void disposeWriter(boolean exception) {
try {
writer.dispose(exception);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
private void onError() {
disposeOnError(writer);
}
}
@@ -232,15 +247,14 @@ class ConnectionManagerImpl implements ConnectionManager {
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private volatile ContactId contactId = null;
private volatile SyncSession incomingSession = null;
@Nullable
private volatile SyncSession outgoingSession = null;
private ManageIncomingDuplexConnection(TransportId transportId,
DuplexTransportConnection transport) {
DuplexTransportConnection connection) {
this.transportId = transportId;
reader = transport.getReader();
writer = transport.getWriter();
reader = connection.getReader();
writer = connection.getWriter();
}
@Override
@@ -248,82 +262,87 @@ class ConnectionManagerImpl implements ConnectionManager {
// Read and recognise the tag
StreamContext ctx;
try {
byte[] tag = readTag(reader);
byte[] tag = readTag(reader.getInputStream());
ctx = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) {
logException(LOG, WARNING, e);
disposeReader(true, false);
onReadError(false);
return;
}
if (ctx == null) {
LOG.info("Unrecognised tag");
disposeReader(false, false);
onReadError(false);
return;
}
ContactId contactId = ctx.getContactId();
if (contactId == null) {
LOG.warning("Expected contact tag, got rendezvous tag");
onReadError(true);
return;
}
if (ctx.isHandshakeMode()) {
// TODO: Support handshake mode for contacts
LOG.warning("Received handshake tag, expected rotation mode");
onReadError(true);
return;
}
contactId = ctx.getContactId();
connectionRegistry.registerConnection(contactId, transportId, true);
// Start the outgoing session on another thread
ioExecutor.execute(this::runOutgoingSession);
ioExecutor.execute(() -> runOutgoingSession(contactId));
try {
// Create and run the incoming session
incomingSession = createIncomingSession(ctx, reader);
incomingSession.run();
disposeReader(false, true);
createIncomingSession(ctx, reader).run();
reader.dispose(false, true);
// Interrupt the outgoing session so it finishes cleanly
SyncSession out = outgoingSession;
if (out != null) out.interrupt();
} catch (IOException e) {
logException(LOG, WARNING, e);
disposeReader(true, true);
onReadError(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
true);
}
}
private void runOutgoingSession() {
private void runOutgoingSession(ContactId contactId) {
// Allocate a stream context
StreamContext ctx;
try {
ctx = keyManager.getStreamContext(contactId, transportId);
} catch (DbException e) {
logException(LOG, WARNING, e);
disposeWriter(true);
onWriteError();
return;
}
if (ctx == null) {
LOG.warning("Could not allocate stream context");
disposeWriter(true);
onWriteError();
return;
}
try {
// Create and run the outgoing session
outgoingSession = createDuplexOutgoingSession(ctx, writer);
outgoingSession.run();
disposeWriter(false);
SyncSession out = createDuplexOutgoingSession(ctx, writer);
outgoingSession = out;
out.run();
writer.dispose(false);
} catch (IOException e) {
logException(LOG, WARNING, e);
disposeWriter(true);
onWriteError();
}
}
private void disposeReader(boolean exception, boolean recognised) {
// Interrupt the outgoing session so it finishes cleanly
if (outgoingSession != null) outgoingSession.interrupt();
try {
reader.dispose(exception, recognised);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
private void onReadError(boolean recognised) {
disposeOnError(reader, recognised);
disposeOnError(writer);
// Interrupt the outgoing session so it finishes
SyncSession out = outgoingSession;
if (out != null) out.interrupt();
}
private void disposeWriter(boolean exception) {
// Interrupt the incoming session if an exception occurred,
// otherwise wait for the end of stream marker
if (exception && incomingSession != null)
incomingSession.interrupt();
try {
writer.dispose(exception);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
private void onWriteError() {
disposeOnError(reader, true);
disposeOnError(writer);
}
}
@@ -334,15 +353,15 @@ class ConnectionManagerImpl implements ConnectionManager {
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private volatile SyncSession incomingSession = null;
@Nullable
private volatile SyncSession outgoingSession = null;
private ManageOutgoingDuplexConnection(ContactId contactId,
TransportId transportId, DuplexTransportConnection transport) {
TransportId transportId, DuplexTransportConnection connection) {
this.contactId = contactId;
this.transportId = transportId;
reader = transport.getReader();
writer = transport.getWriter();
reader = connection.getReader();
writer = connection.getWriter();
}
@Override
@@ -353,24 +372,31 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(contactId, transportId);
} catch (DbException e) {
logException(LOG, WARNING, e);
disposeWriter(true);
onWriteError();
return;
}
if (ctx == null) {
LOG.warning("Could not allocate stream context");
disposeWriter(true);
onWriteError();
return;
}
if (ctx.isHandshakeMode()) {
// TODO: Support handshake mode for contacts
LOG.warning("Cannot use handshake mode stream context");
onWriteError();
return;
}
// Start the incoming session on another thread
ioExecutor.execute(this::runIncomingSession);
try {
// Create and run the outgoing session
outgoingSession = createDuplexOutgoingSession(ctx, writer);
outgoingSession.run();
disposeWriter(false);
SyncSession out = createDuplexOutgoingSession(ctx, writer);
outgoingSession = out;
out.run();
writer.dispose(false);
} catch (IOException e) {
logException(LOG, WARNING, e);
disposeWriter(true);
onWriteError();
}
}
@@ -378,61 +404,67 @@ class ConnectionManagerImpl implements ConnectionManager {
// Read and recognise the tag
StreamContext ctx;
try {
byte[] tag = readTag(reader);
byte[] tag = readTag(reader.getInputStream());
ctx = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) {
logException(LOG, WARNING, e);
disposeReader(true, false);
onReadError();
return;
}
// Unrecognised tags are suspicious in this case
if (ctx == null) {
LOG.warning("Unrecognised tag for returning stream");
disposeReader(true, false);
onReadError();
return;
}
// Check that the stream comes from the expected contact
if (!contactId.equals(ctx.getContactId())) {
ContactId inContactId = ctx.getContactId();
if (inContactId == null) {
LOG.warning("Expected contact tag, got rendezvous tag");
onReadError();
return;
}
if (!contactId.equals(inContactId)) {
LOG.warning("Wrong contact ID for returning stream");
disposeReader(true, true);
onReadError();
return;
}
if (ctx.isHandshakeMode()) {
// TODO: Support handshake mode for contacts
LOG.warning("Received handshake tag, expected rotation mode");
onReadError();
return;
}
connectionRegistry.registerConnection(contactId, transportId,
false);
try {
// Create and run the incoming session
incomingSession = createIncomingSession(ctx, reader);
incomingSession.run();
disposeReader(false, true);
createIncomingSession(ctx, reader).run();
reader.dispose(false, true);
// Interrupt the outgoing session so it finishes cleanly
SyncSession out = outgoingSession;
if (out != null) out.interrupt();
} catch (IOException e) {
logException(LOG, WARNING, e);
disposeReader(true, true);
onReadError();
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
false);
}
}
private void disposeReader(boolean exception, boolean recognised) {
// Interrupt the outgoing session so it finishes cleanly
if (outgoingSession != null) outgoingSession.interrupt();
try {
reader.dispose(exception, recognised);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
private void onReadError() {
// 'Recognised' is always true for outgoing connections
disposeOnError(reader, true);
disposeOnError(writer);
// Interrupt the outgoing session so it finishes
SyncSession out = outgoingSession;
if (out != null) out.interrupt();
}
private void disposeWriter(boolean exception) {
// Interrupt the incoming session if an exception occurred,
// otherwise wait for the end of stream marker
if (exception && incomingSession != null)
incomingSession.interrupt();
try {
writer.dispose(exception);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
private void onWriteError() {
disposeOnError(reader, true);
disposeOnError(writer);
}
}
}