Add ConnectionManager method for incoming mailbox connections.

This commit is contained in:
akwizgran
2022-06-01 14:46:23 +01:00
parent de63a50662
commit 6aa24af94c
4 changed files with 85 additions and 9 deletions

View File

@@ -54,7 +54,7 @@ abstract class Connection {
}
}
private byte[] readTag(InputStream in) throws IOException {
byte[] readTag(InputStream in) throws IOException {
byte[] tag = new byte[TAG_LENGTH];
read(in, tag);
return tag;

View File

@@ -67,7 +67,15 @@ class ConnectionManagerImpl implements ConnectionManager {
TransportConnectionReader r) {
ioExecutor.execute(new IncomingSimplexSyncConnection(keyManager,
connectionRegistry, streamReaderFactory, streamWriterFactory,
syncSessionFactory, transportPropertyManager, t, r));
syncSessionFactory, transportPropertyManager, t, r, null));
}
@Override
public void manageIncomingConnection(TransportId t,
TransportConnectionReader r, TagController c) {
ioExecutor.execute(new IncomingSimplexSyncConnection(keyManager,
connectionRegistry, streamReaderFactory, streamWriterFactory,
syncSessionFactory, transportPropertyManager, t, r, c));
}
@Override

View File

@@ -1,7 +1,9 @@
package org.briarproject.bramble.connection;
import org.briarproject.bramble.api.connection.ConnectionManager.TagController;
import org.briarproject.bramble.api.connection.ConnectionRegistry;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
import org.briarproject.bramble.api.plugin.TransportId;
@@ -15,7 +17,10 @@ import org.briarproject.bramble.api.transport.StreamWriterFactory;
import java.io.IOException;
import javax.annotation.Nullable;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
import static org.briarproject.bramble.util.LogUtils.logException;
@NotNullByDefault
@@ -23,6 +28,8 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable {
private final TransportId transportId;
private final TransportConnectionReader reader;
@Nullable
private final TagController tagController;
IncomingSimplexSyncConnection(KeyManager keyManager,
ConnectionRegistry connectionRegistry,
@@ -30,33 +37,50 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable {
StreamWriterFactory streamWriterFactory,
SyncSessionFactory syncSessionFactory,
TransportPropertyManager transportPropertyManager,
TransportId transportId, TransportConnectionReader reader) {
TransportId transportId,
TransportConnectionReader reader,
@Nullable TagController tagController) {
super(keyManager, connectionRegistry, streamReaderFactory,
streamWriterFactory, syncSessionFactory,
transportPropertyManager);
this.transportId = transportId;
this.reader = reader;
this.tagController = tagController;
}
@Override
public void run() {
// Read and recognise the tag
StreamContext ctx = recogniseTag(reader, transportId);
byte[] tag = null;
StreamContext ctx;
try {
tag = readTag(reader.getInputStream());
// If we have a tag controller, defer marking the tag as recognised
if (tagController == null) {
ctx = keyManager.getStreamContext(transportId, tag);
} else {
ctx = keyManager.getStreamContextOnly(transportId, tag);
}
} catch (IOException | DbException e) {
logException(LOG, WARNING, e);
onError(false, tag);
return;
}
if (ctx == null) {
LOG.info("Unrecognised tag");
onError(false);
onError(false, tag);
return;
}
ContactId contactId = ctx.getContactId();
if (contactId == null) {
LOG.warning("Received rendezvous stream, expected contact");
onError(true);
onError(true, tag);
return;
}
if (ctx.isHandshakeMode()) {
// TODO: Support handshake mode for contacts
LOG.warning("Received handshake tag, expected rotation mode");
onError(true);
onError(true, tag);
return;
}
try {
@@ -65,15 +89,31 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable {
LOG.info("Ignoring priority for simplex connection");
// Create and run the incoming session
createIncomingSession(ctx, reader, handler).run();
// Success
markTagAsRecognisedIfRequired(false, tag);
reader.dispose(false, true);
} catch (IOException e) {
logException(LOG, WARNING, e);
onError(true);
onError(true, tag);
}
}
private void onError(boolean recognised) {
private void onError(boolean recognised, @Nullable byte[] tag) {
if (recognised) {
markTagAsRecognisedIfRequired(true, requireNonNull(tag));
}
disposeOnError(reader, recognised);
}
private void markTagAsRecognisedIfRequired(boolean exception, byte[] tag) {
if (tagController != null &&
tagController.shouldMarkTagAsRecognised(exception)) {
try {
keyManager.markTagAsRecognised(transportId, tag);
} catch (DbException e) {
logException(LOG, WARNING, e);
}
}
}
}