Exchange priority records and close redundant connections.

This commit is contained in:
akwizgran
2020-05-12 21:18:56 +01:00
parent ee9c771045
commit 1b2b50d91b
14 changed files with 237 additions and 30 deletions

View File

@@ -0,0 +1,13 @@
package org.briarproject.bramble.api.sync;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
/**
* An interface for handling a {@link Priority} record received by an
* incoming {@link SyncSession}.
*/
@NotNullByDefault
public interface PriorityHandler {
void handle(Priority p);
}

View File

@@ -6,14 +6,18 @@ import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.InputStream;
import javax.annotation.Nullable;
@NotNullByDefault
public interface SyncSessionFactory {
SyncSession createIncomingSession(ContactId c, InputStream in);
SyncSession createIncomingSession(ContactId c, InputStream in,
PriorityHandler handler);
SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency,
StreamWriter streamWriter);
SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
int maxIdleTime, StreamWriter streamWriter);
int maxIdleTime, StreamWriter streamWriter,
@Nullable Priority priority);
}

View File

@@ -0,0 +1,22 @@
package org.briarproject.bramble.connection;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Priority;
@NotNullByDefault
interface ConnectionChooser {
/**
* Adds the given connection to the chooser with the given priority.
*/
void addConnection(ContactId c, TransportId t, DuplexSyncConnection conn,
Priority p);
/**
* Removes the given connection from the chooser.
*/
void removeConnection(ContactId c, TransportId t,
DuplexSyncConnection conn);
}

View File

@@ -0,0 +1,97 @@
package org.briarproject.bramble.connection;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Priority;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.Bytes.compare;
@NotNullByDefault
class ConnectionChooserImpl implements ConnectionChooser {
private static final Logger LOG =
getLogger(ConnectionChooserImpl.class.getName());
private final Object lock = new Object();
@GuardedBy("lock")
private final Map<Key, Value> bestConnections = new HashMap<>();
@Inject
ConnectionChooserImpl() {
}
@Override
public void addConnection(ContactId c, TransportId t,
DuplexSyncConnection conn, Priority p) {
synchronized (lock) {
Key k = new Key(c, t);
Value best = bestConnections.get(k);
if (best == null) {
bestConnections.put(k, new Value(conn, p));
} else if (compare(p.getNonce(), best.priority.getNonce()) > 0) {
LOG.info("Found a better connection");
bestConnections.put(k, new Value(conn, p));
best.connection.interruptOutgoingSession();
}
LOG.info("Already have a better connection");
conn.interruptOutgoingSession();
}
}
@Override
public void removeConnection(ContactId c, TransportId t,
DuplexSyncConnection conn) {
synchronized (lock) {
Key k = new Key(c, t);
Value best = bestConnections.get(k);
if (best.connection == conn) bestConnections.remove(k);
}
}
private static class Key {
private final ContactId contactId;
private final TransportId transportId;
private Key(ContactId contactId, TransportId transportId) {
this.contactId = contactId;
this.transportId = transportId;
}
@Override
public int hashCode() {
return contactId.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof Key) {
Key k = (Key) o;
return contactId.equals(k.contactId) &&
transportId.equals(k.transportId);
} else {
return false;
}
}
}
private static class Value {
private final DuplexSyncConnection connection;
private final Priority priority;
private Value(DuplexSyncConnection connection, Priority priority) {
this.connection = connection;
this.priority = priority;
}
}
}

View File

@@ -18,6 +18,7 @@ import org.briarproject.bramble.api.transport.KeyManager;
import org.briarproject.bramble.api.transport.StreamReaderFactory;
import org.briarproject.bramble.api.transport.StreamWriterFactory;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.Immutable;
@@ -36,6 +37,8 @@ class ConnectionManagerImpl implements ConnectionManager {
private final ContactExchangeManager contactExchangeManager;
private final ConnectionRegistry connectionRegistry;
private final TransportPropertyManager transportPropertyManager;
private final ConnectionChooser connectionChooser;
private final SecureRandom secureRandom;
@Inject
ConnectionManagerImpl(@IoExecutor Executor ioExecutor,
@@ -45,7 +48,8 @@ class ConnectionManagerImpl implements ConnectionManager {
HandshakeManager handshakeManager,
ContactExchangeManager contactExchangeManager,
ConnectionRegistry connectionRegistry,
TransportPropertyManager transportPropertyManager) {
TransportPropertyManager transportPropertyManager,
ConnectionChooser connectionChooser, SecureRandom secureRandom) {
this.ioExecutor = ioExecutor;
this.keyManager = keyManager;
this.streamReaderFactory = streamReaderFactory;
@@ -55,6 +59,8 @@ class ConnectionManagerImpl implements ConnectionManager {
this.contactExchangeManager = contactExchangeManager;
this.connectionRegistry = connectionRegistry;
this.transportPropertyManager = transportPropertyManager;
this.connectionChooser = connectionChooser;
this.secureRandom = secureRandom;
}
@@ -72,7 +78,7 @@ class ConnectionManagerImpl implements ConnectionManager {
ioExecutor.execute(new IncomingDuplexSyncConnection(keyManager,
connectionRegistry, streamReaderFactory, streamWriterFactory,
syncSessionFactory, transportPropertyManager, ioExecutor,
t, d));
connectionChooser, t, d));
}
@Override
@@ -97,7 +103,7 @@ class ConnectionManagerImpl implements ConnectionManager {
ioExecutor.execute(new OutgoingDuplexSyncConnection(keyManager,
connectionRegistry, streamReaderFactory, streamWriterFactory,
syncSessionFactory, transportPropertyManager, ioExecutor,
c, t, d));
connectionChooser, secureRandom, c, t, d));
}
@Override

View File

@@ -23,4 +23,11 @@ public class ConnectionModule {
ConnectionRegistryImpl connectionRegistry) {
return connectionRegistry;
}
@Provides
@Singleton
ConnectionChooser provideConnectionChooser(
ConnectionChooserImpl connectionChooser) {
return connectionChooser;
}
}

View File

@@ -9,6 +9,7 @@ import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.transport.KeyManager;
@@ -29,6 +30,7 @@ import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
abstract class DuplexSyncConnection extends SyncConnection {
final Executor ioExecutor;
final ConnectionChooser connectionChooser;
final TransportId transportId;
final TransportConnectionReader reader;
final TransportConnectionWriter writer;
@@ -65,12 +67,13 @@ abstract class DuplexSyncConnection extends SyncConnection {
StreamWriterFactory streamWriterFactory,
SyncSessionFactory syncSessionFactory,
TransportPropertyManager transportPropertyManager,
Executor ioExecutor, TransportId transportId,
DuplexTransportConnection connection) {
Executor ioExecutor, ConnectionChooser connectionChooser,
TransportId transportId, DuplexTransportConnection connection) {
super(keyManager, connectionRegistry, streamReaderFactory,
streamWriterFactory, syncSessionFactory,
transportPropertyManager);
this.ioExecutor = ioExecutor;
this.connectionChooser = connectionChooser;
this.transportId = transportId;
reader = connection.getReader();
writer = connection.getWriter();
@@ -89,11 +92,12 @@ abstract class DuplexSyncConnection extends SyncConnection {
}
SyncSession createDuplexOutgoingSession(StreamContext ctx,
TransportConnectionWriter w) throws IOException {
TransportConnectionWriter w, @Nullable Priority priority)
throws IOException {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), ctx);
ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createDuplexOutgoingSession(c,
w.getMaxLatency(), w.getMaxIdleTime(), streamWriter);
w.getMaxLatency(), w.getMaxIdleTime(), streamWriter, priority);
}
}

View File

@@ -7,6 +7,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.sync.PriorityHandler;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.transport.KeyManager;
@@ -30,11 +31,12 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
StreamWriterFactory streamWriterFactory,
SyncSessionFactory syncSessionFactory,
TransportPropertyManager transportPropertyManager,
Executor ioExecutor, TransportId transportId,
DuplexTransportConnection connection) {
Executor ioExecutor, ConnectionChooser connectionChooser,
TransportId transportId, DuplexTransportConnection connection) {
super(keyManager, connectionRegistry, streamReaderFactory,
streamWriterFactory, syncSessionFactory,
transportPropertyManager, ioExecutor, transportId, connection);
transportPropertyManager, ioExecutor, connectionChooser,
transportId, connection);
}
@Override
@@ -65,8 +67,11 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
// Store any transport properties discovered from the connection
transportPropertyManager.addRemotePropertiesFromConnection(
contactId, transportId, remote);
// Add the connection to the chooser when we receive its priority
PriorityHandler handler = p -> connectionChooser.addConnection(
contactId, transportId, this, p);
// Create and run the incoming session
createIncomingSession(ctx, reader).run();
createIncomingSession(ctx, reader, handler).run();
reader.dispose(false, true);
interruptOutgoingSession();
} catch (DbException | IOException e) {
@@ -75,6 +80,7 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
true);
connectionChooser.removeConnection(contactId, transportId, this);
}
}
@@ -88,7 +94,7 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
}
try {
// Create and run the outgoing session
SyncSession out = createDuplexOutgoingSession(ctx, writer);
SyncSession out = createDuplexOutgoingSession(ctx, writer, null);
setOutgoingSession(out);
out.run();
writer.dispose(false);

View File

@@ -6,6 +6,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.sync.PriorityHandler;
import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.transport.KeyManager;
import org.briarproject.bramble.api.transport.StreamContext;
@@ -60,8 +61,11 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable {
}
connectionRegistry.registerConnection(contactId, transportId, true);
try {
// We don't expect to receive a priority for this connection
PriorityHandler handler = p ->
LOG.info("Ignoring priority for simplex connection");
// Create and run the incoming session
createIncomingSession(ctx, reader).run();
createIncomingSession(ctx, reader, handler).run();
reader.dispose(false, true);
} catch (IOException e) {
logException(LOG, WARNING, e);

View File

@@ -7,6 +7,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.PriorityHandler;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.transport.KeyManager;
@@ -15,15 +17,18 @@ import org.briarproject.bramble.api.transport.StreamReaderFactory;
import org.briarproject.bramble.api.transport.StreamWriterFactory;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.sync.SyncConstants.PRIORITY_NONCE_BYTES;
import static org.briarproject.bramble.util.LogUtils.logException;
@NotNullByDefault
class OutgoingDuplexSyncConnection extends DuplexSyncConnection
implements Runnable {
private final SecureRandom secureRandom;
private final ContactId contactId;
OutgoingDuplexSyncConnection(KeyManager keyManager,
@@ -32,11 +37,14 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
StreamWriterFactory streamWriterFactory,
SyncSessionFactory syncSessionFactory,
TransportPropertyManager transportPropertyManager,
Executor ioExecutor, ContactId contactId, TransportId transportId,
DuplexTransportConnection connection) {
Executor ioExecutor, ConnectionChooser connectionChooser,
SecureRandom secureRandom, ContactId contactId,
TransportId transportId, DuplexTransportConnection connection) {
super(keyManager, connectionRegistry, streamReaderFactory,
streamWriterFactory, syncSessionFactory,
transportPropertyManager, ioExecutor, transportId, connection);
transportPropertyManager, ioExecutor, connectionChooser,
transportId, connection);
this.secureRandom = secureRandom;
this.contactId = contactId;
}
@@ -56,10 +64,12 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
return;
}
// Start the incoming session on another thread
ioExecutor.execute(this::runIncomingSession);
Priority priority = generatePriority();
ioExecutor.execute(() -> runIncomingSession(priority));
try {
// Create and run the outgoing session
SyncSession out = createDuplexOutgoingSession(ctx, writer);
SyncSession out =
createDuplexOutgoingSession(ctx, writer, priority);
setOutgoingSession(out);
out.run();
writer.dispose(false);
@@ -69,7 +79,7 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
}
}
private void runIncomingSession() {
private void runIncomingSession(Priority priority) {
// Read and recognise the tag
StreamContext ctx = recogniseTag(reader, transportId);
// Unrecognised tags are suspicious in this case
@@ -97,12 +107,16 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
return;
}
connectionRegistry.registerConnection(contactId, transportId, false);
connectionChooser.addConnection(contactId, transportId, this, priority);
try {
// Store any transport properties discovered from the connection
transportPropertyManager.addRemotePropertiesFromConnection(
contactId, transportId, remote);
// We don't expect to receive a priority for this connection
PriorityHandler handler = p ->
LOG.info("Ignoring priority for outgoing connection");
// Create and run the incoming session
createIncomingSession(ctx, reader).run();
createIncomingSession(ctx, reader, handler).run();
reader.dispose(false, true);
interruptOutgoingSession();
} catch (DbException | IOException e) {
@@ -111,6 +125,7 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
false);
connectionChooser.removeConnection(contactId, transportId, this);
}
}
@@ -118,4 +133,10 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
// 'Recognised' is always true for outgoing connections
onReadError(true);
}
private Priority generatePriority() {
byte[] nonce = new byte[PRIORITY_NONCE_BYTES];
secureRandom.nextBytes(nonce);
return new Priority(nonce);
}
}

View File

@@ -7,6 +7,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.sync.PriorityHandler;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.transport.KeyManager;
@@ -52,10 +53,12 @@ class SyncConnection extends Connection {
}
SyncSession createIncomingSession(StreamContext ctx,
TransportConnectionReader r) throws IOException {
TransportConnectionReader r, PriorityHandler handler)
throws IOException {
InputStream streamReader = streamReaderFactory.createStreamReader(
r.getInputStream(), ctx);
ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createIncomingSession(c, streamReader);
return syncSessionFactory
.createIncomingSession(c, streamReader, handler);
}
}

View File

@@ -14,6 +14,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -74,6 +76,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final int maxLatency, maxIdleTime;
private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter;
@Nullable
private final Priority priority;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
private final AtomicBoolean generateAckQueued = new AtomicBoolean(false);
@@ -88,7 +92,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency,
int maxIdleTime, StreamWriter streamWriter,
SyncRecordWriter recordWriter) {
SyncRecordWriter recordWriter, @Nullable Priority priority) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
@@ -98,6 +102,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
this.maxIdleTime = maxIdleTime;
this.streamWriter = streamWriter;
this.recordWriter = recordWriter;
this.priority = priority;
writerTasks = new LinkedBlockingQueue<>();
}
@@ -108,6 +113,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
try {
// Send our supported protocol versions
recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS));
// Send our connection priority, if this is an outgoing connection
if (priority != null) recordWriter.writePriority(priority);
// Start a query for each type of record
generateAck();
generateBatch();

View File

@@ -15,6 +15,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.PriorityHandler;
import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordReader;
import org.briarproject.bramble.api.sync.SyncSession;
@@ -47,17 +49,19 @@ class IncomingSession implements SyncSession, EventListener {
private final EventBus eventBus;
private final ContactId contactId;
private final SyncRecordReader recordReader;
private final PriorityHandler priorityHandler;
private volatile boolean interrupted = false;
IncomingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId,
SyncRecordReader recordReader) {
SyncRecordReader recordReader, PriorityHandler priorityHandler) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.contactId = contactId;
this.recordReader = recordReader;
this.priorityHandler = priorityHandler;
}
@IoExecutor
@@ -86,6 +90,9 @@ class IncomingSession implements SyncSession, EventListener {
} else if (recordReader.hasVersions()) {
Versions v = recordReader.readVersions();
dbExecutor.execute(new ReceiveVersions(v));
} else if (recordReader.hasPriority()) {
Priority p = recordReader.readPriority();
priorityHandler.handle(p);
} else {
// unknown records are ignored in RecordReader#eof()
throw new FormatException();

View File

@@ -5,6 +5,8 @@ import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.PriorityHandler;
import org.briarproject.bramble.api.sync.SyncRecordReader;
import org.briarproject.bramble.api.sync.SyncRecordReaderFactory;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
@@ -18,6 +20,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.inject.Inject;
@@ -46,10 +49,12 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
}
@Override
public SyncSession createIncomingSession(ContactId c, InputStream in) {
public SyncSession createIncomingSession(ContactId c, InputStream in,
PriorityHandler handler) {
SyncRecordReader recordReader =
recordReaderFactory.createRecordReader(in);
return new IncomingSession(db, dbExecutor, eventBus, c, recordReader);
return new IncomingSession(db, dbExecutor, eventBus, c, recordReader,
handler);
}
@Override
@@ -64,11 +69,12 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
@Override
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
int maxIdleTime, StreamWriter streamWriter) {
int maxIdleTime, StreamWriter streamWriter,
@Nullable Priority priority) {
OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c,
maxLatency, maxIdleTime, streamWriter, recordWriter);
maxLatency, maxIdleTime, streamWriter, recordWriter, priority);
}
}