mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-15 20:29:52 +01:00
Clean up connection manager, ready for pending contacts.
This commit is contained in:
@@ -3,6 +3,7 @@ package org.briarproject.bramble.plugin;
|
|||||||
import org.briarproject.bramble.api.contact.ContactId;
|
import org.briarproject.bramble.api.contact.ContactId;
|
||||||
import org.briarproject.bramble.api.db.DbException;
|
import org.briarproject.bramble.api.db.DbException;
|
||||||
import org.briarproject.bramble.api.lifecycle.IoExecutor;
|
import org.briarproject.bramble.api.lifecycle.IoExecutor;
|
||||||
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
import org.briarproject.bramble.api.plugin.ConnectionManager;
|
import org.briarproject.bramble.api.plugin.ConnectionManager;
|
||||||
import org.briarproject.bramble.api.plugin.ConnectionRegistry;
|
import org.briarproject.bramble.api.plugin.ConnectionRegistry;
|
||||||
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
|
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
|
||||||
@@ -17,22 +18,26 @@ import org.briarproject.bramble.api.transport.StreamReaderFactory;
|
|||||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||||
import org.briarproject.bramble.api.transport.StreamWriterFactory;
|
import org.briarproject.bramble.api.transport.StreamWriterFactory;
|
||||||
|
|
||||||
import java.io.EOFException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
|
||||||
import static java.util.logging.Level.WARNING;
|
import static java.util.logging.Level.WARNING;
|
||||||
|
import static java.util.logging.Logger.getLogger;
|
||||||
|
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
||||||
import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH;
|
import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH;
|
||||||
|
import static org.briarproject.bramble.util.IoUtils.read;
|
||||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||||
|
|
||||||
|
@NotNullByDefault
|
||||||
class ConnectionManagerImpl implements ConnectionManager {
|
class ConnectionManagerImpl implements ConnectionManager {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
Logger.getLogger(ConnectionManagerImpl.class.getName());
|
getLogger(ConnectionManagerImpl.class.getName());
|
||||||
|
|
||||||
private final Executor ioExecutor;
|
private final Executor ioExecutor;
|
||||||
private final KeyManager keyManager;
|
private final KeyManager keyManager;
|
||||||
@@ -79,16 +84,9 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
ioExecutor.execute(new ManageOutgoingDuplexConnection(c, t, d));
|
ioExecutor.execute(new ManageOutgoingDuplexConnection(c, t, d));
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] readTag(TransportConnectionReader r) throws IOException {
|
private byte[] readTag(InputStream in) throws IOException {
|
||||||
// Read the tag
|
|
||||||
byte[] tag = new byte[TAG_LENGTH];
|
byte[] tag = new byte[TAG_LENGTH];
|
||||||
InputStream in = r.getInputStream();
|
read(in, tag);
|
||||||
int offset = 0;
|
|
||||||
while (offset < tag.length) {
|
|
||||||
int read = in.read(tag, offset, tag.length - offset);
|
|
||||||
if (read == -1) throw new EOFException();
|
|
||||||
offset += read;
|
|
||||||
}
|
|
||||||
return tag;
|
return tag;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,28 +94,43 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
TransportConnectionReader r) throws IOException {
|
TransportConnectionReader r) throws IOException {
|
||||||
InputStream streamReader = streamReaderFactory.createStreamReader(
|
InputStream streamReader = streamReaderFactory.createStreamReader(
|
||||||
r.getInputStream(), ctx);
|
r.getInputStream(), ctx);
|
||||||
// TODO: Pending contacts, handshake mode
|
ContactId c = requireNonNull(ctx.getContactId());
|
||||||
return syncSessionFactory.createIncomingSession(ctx.getContactId(),
|
return syncSessionFactory.createIncomingSession(c, streamReader);
|
||||||
streamReader);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private SyncSession createSimplexOutgoingSession(StreamContext ctx,
|
private SyncSession createSimplexOutgoingSession(StreamContext ctx,
|
||||||
TransportConnectionWriter w) throws IOException {
|
TransportConnectionWriter w) throws IOException {
|
||||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
||||||
w.getOutputStream(), ctx);
|
w.getOutputStream(), ctx);
|
||||||
// TODO: Pending contacts, handshake mode
|
ContactId c = requireNonNull(ctx.getContactId());
|
||||||
return syncSessionFactory.createSimplexOutgoingSession(
|
return syncSessionFactory.createSimplexOutgoingSession(c,
|
||||||
ctx.getContactId(), w.getMaxLatency(), streamWriter);
|
w.getMaxLatency(), streamWriter);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SyncSession createDuplexOutgoingSession(StreamContext ctx,
|
private SyncSession createDuplexOutgoingSession(StreamContext ctx,
|
||||||
TransportConnectionWriter w) throws IOException {
|
TransportConnectionWriter w) throws IOException {
|
||||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
||||||
w.getOutputStream(), ctx);
|
w.getOutputStream(), ctx);
|
||||||
// TODO: Pending contacts, handshake mode
|
ContactId c = requireNonNull(ctx.getContactId());
|
||||||
return syncSessionFactory.createDuplexOutgoingSession(
|
return syncSessionFactory.createDuplexOutgoingSession(c,
|
||||||
ctx.getContactId(), w.getMaxLatency(), w.getMaxIdleTime(),
|
w.getMaxLatency(), w.getMaxIdleTime(), streamWriter);
|
||||||
streamWriter);
|
}
|
||||||
|
|
||||||
|
private void disposeOnError(TransportConnectionReader reader,
|
||||||
|
boolean recognised) {
|
||||||
|
try {
|
||||||
|
reader.dispose(true, recognised);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logException(LOG, WARNING, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void disposeOnError(TransportConnectionWriter writer) {
|
||||||
|
try {
|
||||||
|
writer.dispose(true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logException(LOG, WARNING, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ManageIncomingSimplexConnection implements Runnable {
|
private class ManageIncomingSimplexConnection implements Runnable {
|
||||||
@@ -136,40 +149,46 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
// Read and recognise the tag
|
// Read and recognise the tag
|
||||||
StreamContext ctx;
|
StreamContext ctx;
|
||||||
try {
|
try {
|
||||||
byte[] tag = readTag(reader);
|
byte[] tag = readTag(reader.getInputStream());
|
||||||
ctx = keyManager.getStreamContext(transportId, tag);
|
ctx = keyManager.getStreamContext(transportId, tag);
|
||||||
} catch (IOException | DbException e) {
|
} catch (IOException | DbException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeReader(true, false);
|
onError(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
LOG.info("Unrecognised tag");
|
LOG.info("Unrecognised tag");
|
||||||
disposeReader(false, false);
|
onError(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// TODO: Pending contacts
|
|
||||||
ContactId contactId = ctx.getContactId();
|
ContactId contactId = ctx.getContactId();
|
||||||
|
if (contactId == null) {
|
||||||
|
LOG.warning("Received rendezvous stream, expected contact");
|
||||||
|
onError(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (ctx.isHandshakeMode()) {
|
||||||
|
// TODO: Support handshake mode for contacts
|
||||||
|
LOG.warning("Received handshake tag, expected rotation mode");
|
||||||
|
onError(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
connectionRegistry.registerConnection(contactId, transportId, true);
|
connectionRegistry.registerConnection(contactId, transportId, true);
|
||||||
try {
|
try {
|
||||||
// Create and run the incoming session
|
// Create and run the incoming session
|
||||||
createIncomingSession(ctx, reader).run();
|
createIncomingSession(ctx, reader).run();
|
||||||
disposeReader(false, true);
|
reader.dispose(false, true);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeReader(true, true);
|
onError(true);
|
||||||
} finally {
|
} finally {
|
||||||
connectionRegistry.unregisterConnection(contactId, transportId,
|
connectionRegistry.unregisterConnection(contactId, transportId,
|
||||||
true);
|
true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void disposeReader(boolean exception, boolean recognised) {
|
private void onError(boolean recognised) {
|
||||||
try {
|
disposeOnError(reader, recognised);
|
||||||
reader.dispose(exception, recognised);
|
|
||||||
} catch (IOException e) {
|
|
||||||
logException(LOG, WARNING, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -194,12 +213,12 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
ctx = keyManager.getStreamContext(contactId, transportId);
|
ctx = keyManager.getStreamContext(contactId, transportId);
|
||||||
} catch (DbException e) {
|
} catch (DbException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeWriter(true);
|
onError();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
LOG.warning("Could not allocate stream context");
|
LOG.warning("Could not allocate stream context");
|
||||||
disposeWriter(true);
|
onError();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
connectionRegistry.registerConnection(contactId, transportId,
|
connectionRegistry.registerConnection(contactId, transportId,
|
||||||
@@ -207,22 +226,18 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
try {
|
try {
|
||||||
// Create and run the outgoing session
|
// Create and run the outgoing session
|
||||||
createSimplexOutgoingSession(ctx, writer).run();
|
createSimplexOutgoingSession(ctx, writer).run();
|
||||||
disposeWriter(false);
|
writer.dispose(false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeWriter(true);
|
onError();
|
||||||
} finally {
|
} finally {
|
||||||
connectionRegistry.unregisterConnection(contactId, transportId,
|
connectionRegistry.unregisterConnection(contactId, transportId,
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void disposeWriter(boolean exception) {
|
private void onError() {
|
||||||
try {
|
disposeOnError(writer);
|
||||||
writer.dispose(exception);
|
|
||||||
} catch (IOException e) {
|
|
||||||
logException(LOG, WARNING, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -232,15 +247,14 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
private final TransportConnectionReader reader;
|
private final TransportConnectionReader reader;
|
||||||
private final TransportConnectionWriter writer;
|
private final TransportConnectionWriter writer;
|
||||||
|
|
||||||
private volatile ContactId contactId = null;
|
@Nullable
|
||||||
private volatile SyncSession incomingSession = null;
|
|
||||||
private volatile SyncSession outgoingSession = null;
|
private volatile SyncSession outgoingSession = null;
|
||||||
|
|
||||||
private ManageIncomingDuplexConnection(TransportId transportId,
|
private ManageIncomingDuplexConnection(TransportId transportId,
|
||||||
DuplexTransportConnection transport) {
|
DuplexTransportConnection connection) {
|
||||||
this.transportId = transportId;
|
this.transportId = transportId;
|
||||||
reader = transport.getReader();
|
reader = connection.getReader();
|
||||||
writer = transport.getWriter();
|
writer = connection.getWriter();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -248,82 +262,79 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
// Read and recognise the tag
|
// Read and recognise the tag
|
||||||
StreamContext ctx;
|
StreamContext ctx;
|
||||||
try {
|
try {
|
||||||
byte[] tag = readTag(reader);
|
byte[] tag = readTag(reader.getInputStream());
|
||||||
ctx = keyManager.getStreamContext(transportId, tag);
|
ctx = keyManager.getStreamContext(transportId, tag);
|
||||||
} catch (IOException | DbException e) {
|
} catch (IOException | DbException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeReader(true, false);
|
onError(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
LOG.info("Unrecognised tag");
|
LOG.info("Unrecognised tag");
|
||||||
disposeReader(false, false);
|
onError(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ContactId contactId = ctx.getContactId();
|
||||||
|
if (contactId == null) {
|
||||||
|
LOG.warning("Expected contact tag, got rendezvous tag");
|
||||||
|
onError(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (ctx.isHandshakeMode()) {
|
||||||
|
// TODO: Support handshake mode for contacts
|
||||||
|
LOG.warning("Received handshake tag, expected rotation mode");
|
||||||
|
onError(true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
contactId = ctx.getContactId();
|
|
||||||
connectionRegistry.registerConnection(contactId, transportId, true);
|
connectionRegistry.registerConnection(contactId, transportId, true);
|
||||||
// Start the outgoing session on another thread
|
// Start the outgoing session on another thread
|
||||||
ioExecutor.execute(this::runOutgoingSession);
|
ioExecutor.execute(() -> runOutgoingSession(contactId));
|
||||||
try {
|
try {
|
||||||
// Create and run the incoming session
|
// Create and run the incoming session
|
||||||
incomingSession = createIncomingSession(ctx, reader);
|
createIncomingSession(ctx, reader).run();
|
||||||
incomingSession.run();
|
reader.dispose(false, true);
|
||||||
disposeReader(false, true);
|
// Interrupt the outgoing session so it finishes cleanly
|
||||||
|
SyncSession out = outgoingSession;
|
||||||
|
if (out != null) out.interrupt();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeReader(true, true);
|
onError(true);
|
||||||
} finally {
|
} finally {
|
||||||
connectionRegistry.unregisterConnection(contactId, transportId,
|
connectionRegistry.unregisterConnection(contactId, transportId,
|
||||||
true);
|
true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runOutgoingSession() {
|
private void runOutgoingSession(ContactId contactId) {
|
||||||
// Allocate a stream context
|
// Allocate a stream context
|
||||||
StreamContext ctx;
|
StreamContext ctx;
|
||||||
try {
|
try {
|
||||||
ctx = keyManager.getStreamContext(contactId, transportId);
|
ctx = keyManager.getStreamContext(contactId, transportId);
|
||||||
} catch (DbException e) {
|
} catch (DbException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeWriter(true);
|
onError(true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
LOG.warning("Could not allocate stream context");
|
LOG.warning("Could not allocate stream context");
|
||||||
disposeWriter(true);
|
onError(true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
// Create and run the outgoing session
|
// Create and run the outgoing session
|
||||||
outgoingSession = createDuplexOutgoingSession(ctx, writer);
|
SyncSession out = createDuplexOutgoingSession(ctx, writer);
|
||||||
outgoingSession.run();
|
outgoingSession = out;
|
||||||
disposeWriter(false);
|
out.run();
|
||||||
|
writer.dispose(false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeWriter(true);
|
onError(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void disposeReader(boolean exception, boolean recognised) {
|
private void onError(boolean recognised) {
|
||||||
// Interrupt the outgoing session so it finishes cleanly
|
disposeOnError(reader, recognised);
|
||||||
if (outgoingSession != null) outgoingSession.interrupt();
|
disposeOnError(writer);
|
||||||
try {
|
|
||||||
reader.dispose(exception, recognised);
|
|
||||||
} catch (IOException e) {
|
|
||||||
logException(LOG, WARNING, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void disposeWriter(boolean exception) {
|
|
||||||
// Interrupt the incoming session if an exception occurred,
|
|
||||||
// otherwise wait for the end of stream marker
|
|
||||||
if (exception && incomingSession != null)
|
|
||||||
incomingSession.interrupt();
|
|
||||||
try {
|
|
||||||
writer.dispose(exception);
|
|
||||||
} catch (IOException e) {
|
|
||||||
logException(LOG, WARNING, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -334,15 +345,15 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
private final TransportConnectionReader reader;
|
private final TransportConnectionReader reader;
|
||||||
private final TransportConnectionWriter writer;
|
private final TransportConnectionWriter writer;
|
||||||
|
|
||||||
private volatile SyncSession incomingSession = null;
|
@Nullable
|
||||||
private volatile SyncSession outgoingSession = null;
|
private volatile SyncSession outgoingSession = null;
|
||||||
|
|
||||||
private ManageOutgoingDuplexConnection(ContactId contactId,
|
private ManageOutgoingDuplexConnection(ContactId contactId,
|
||||||
TransportId transportId, DuplexTransportConnection transport) {
|
TransportId transportId, DuplexTransportConnection connection) {
|
||||||
this.contactId = contactId;
|
this.contactId = contactId;
|
||||||
this.transportId = transportId;
|
this.transportId = transportId;
|
||||||
reader = transport.getReader();
|
reader = connection.getReader();
|
||||||
writer = transport.getWriter();
|
writer = connection.getWriter();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -353,24 +364,31 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
ctx = keyManager.getStreamContext(contactId, transportId);
|
ctx = keyManager.getStreamContext(contactId, transportId);
|
||||||
} catch (DbException e) {
|
} catch (DbException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeWriter(true);
|
onError();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
LOG.warning("Could not allocate stream context");
|
LOG.warning("Could not allocate stream context");
|
||||||
disposeWriter(true);
|
onError();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (ctx.isHandshakeMode()) {
|
||||||
|
// TODO: Support handshake mode for contacts
|
||||||
|
LOG.warning("Cannot use handshake mode stream context");
|
||||||
|
onError();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Start the incoming session on another thread
|
// Start the incoming session on another thread
|
||||||
ioExecutor.execute(this::runIncomingSession);
|
ioExecutor.execute(this::runIncomingSession);
|
||||||
try {
|
try {
|
||||||
// Create and run the outgoing session
|
// Create and run the outgoing session
|
||||||
outgoingSession = createDuplexOutgoingSession(ctx, writer);
|
SyncSession out = createDuplexOutgoingSession(ctx, writer);
|
||||||
outgoingSession.run();
|
outgoingSession = out;
|
||||||
disposeWriter(false);
|
out.run();
|
||||||
|
writer.dispose(false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeWriter(true);
|
onError();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -378,61 +396,59 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
// Read and recognise the tag
|
// Read and recognise the tag
|
||||||
StreamContext ctx;
|
StreamContext ctx;
|
||||||
try {
|
try {
|
||||||
byte[] tag = readTag(reader);
|
byte[] tag = readTag(reader.getInputStream());
|
||||||
ctx = keyManager.getStreamContext(transportId, tag);
|
ctx = keyManager.getStreamContext(transportId, tag);
|
||||||
} catch (IOException | DbException e) {
|
} catch (IOException | DbException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeReader(true, false);
|
onError();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Unrecognised tags are suspicious in this case
|
// Unrecognised tags are suspicious in this case
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
LOG.warning("Unrecognised tag for returning stream");
|
LOG.warning("Unrecognised tag for returning stream");
|
||||||
disposeReader(true, false);
|
onError();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Check that the stream comes from the expected contact
|
// Check that the stream comes from the expected contact
|
||||||
if (!contactId.equals(ctx.getContactId())) {
|
ContactId inContactId = ctx.getContactId();
|
||||||
|
if (inContactId == null) {
|
||||||
|
LOG.warning("Expected contact tag, got rendezvous tag");
|
||||||
|
onError();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!contactId.equals(inContactId)) {
|
||||||
LOG.warning("Wrong contact ID for returning stream");
|
LOG.warning("Wrong contact ID for returning stream");
|
||||||
disposeReader(true, true);
|
onError();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (ctx.isHandshakeMode()) {
|
||||||
|
// TODO: Support handshake mode for contacts
|
||||||
|
LOG.warning("Received handshake tag, expected rotation mode");
|
||||||
|
onError();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
connectionRegistry.registerConnection(contactId, transportId,
|
connectionRegistry.registerConnection(contactId, transportId,
|
||||||
false);
|
false);
|
||||||
try {
|
try {
|
||||||
// Create and run the incoming session
|
// Create and run the incoming session
|
||||||
incomingSession = createIncomingSession(ctx, reader);
|
createIncomingSession(ctx, reader).run();
|
||||||
incomingSession.run();
|
reader.dispose(false, true);
|
||||||
disposeReader(false, true);
|
// Interrupt the outgoing session so it finishes cleanly
|
||||||
|
SyncSession out = outgoingSession;
|
||||||
|
if (out != null) out.interrupt();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
disposeReader(true, true);
|
onError();
|
||||||
} finally {
|
} finally {
|
||||||
connectionRegistry.unregisterConnection(contactId, transportId,
|
connectionRegistry.unregisterConnection(contactId, transportId,
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void disposeReader(boolean exception, boolean recognised) {
|
private void onError() {
|
||||||
// Interrupt the outgoing session so it finishes cleanly
|
// 'Recognised' is always true for outgoing connections
|
||||||
if (outgoingSession != null) outgoingSession.interrupt();
|
disposeOnError(reader, true);
|
||||||
try {
|
disposeOnError(writer);
|
||||||
reader.dispose(exception, recognised);
|
|
||||||
} catch (IOException e) {
|
|
||||||
logException(LOG, WARNING, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void disposeWriter(boolean exception) {
|
|
||||||
// Interrupt the incoming session if an exception occurred,
|
|
||||||
// otherwise wait for the end of stream marker
|
|
||||||
if (exception && incomingSession != null)
|
|
||||||
incomingSession.interrupt();
|
|
||||||
try {
|
|
||||||
writer.dispose(exception);
|
|
||||||
} catch (IOException e) {
|
|
||||||
logException(LOG, WARNING, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user