diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/PriorityHandler.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/PriorityHandler.java new file mode 100644 index 000000000..57b2bcf98 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/PriorityHandler.java @@ -0,0 +1,13 @@ +package org.briarproject.bramble.api.sync; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +/** + * An interface for handling a {@link Priority} record received by an + * incoming {@link SyncSession}. + */ +@NotNullByDefault +public interface PriorityHandler { + + void handle(Priority p); +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java index 216f29398..2107c9a1f 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java @@ -6,14 +6,18 @@ import org.briarproject.bramble.api.transport.StreamWriter; import java.io.InputStream; +import javax.annotation.Nullable; + @NotNullByDefault public interface SyncSessionFactory { - SyncSession createIncomingSession(ContactId c, InputStream in); + SyncSession createIncomingSession(ContactId c, InputStream in, + PriorityHandler handler); SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency, StreamWriter streamWriter); SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, - int maxIdleTime, StreamWriter streamWriter); + int maxIdleTime, StreamWriter streamWriter, + @Nullable Priority priority); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooser.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooser.java new file mode 100644 index 000000000..da26c904f --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooser.java @@ -0,0 +1,22 @@ +package org.briarproject.bramble.connection; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.sync.Priority; + +@NotNullByDefault +interface ConnectionChooser { + + /** + * Adds the given connection to the chooser with the given priority. + */ + void addConnection(ContactId c, TransportId t, DuplexSyncConnection conn, + Priority p); + + /** + * Removes the given connection from the chooser. + */ + void removeConnection(ContactId c, TransportId t, + DuplexSyncConnection conn); +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooserImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooserImpl.java new file mode 100644 index 000000000..e3c64e449 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooserImpl.java @@ -0,0 +1,97 @@ +package org.briarproject.bramble.connection; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.sync.Priority; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; + +import javax.annotation.concurrent.GuardedBy; +import javax.inject.Inject; + +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.Bytes.compare; + +@NotNullByDefault +class ConnectionChooserImpl implements ConnectionChooser { + + private static final Logger LOG = + getLogger(ConnectionChooserImpl.class.getName()); + + private final Object lock = new Object(); + @GuardedBy("lock") + private final Map bestConnections = new HashMap<>(); + + @Inject + ConnectionChooserImpl() { + } + + @Override + public void addConnection(ContactId c, TransportId t, + DuplexSyncConnection conn, Priority p) { + synchronized (lock) { + Key k = new Key(c, t); + Value best = bestConnections.get(k); + if (best == null) { + bestConnections.put(k, new Value(conn, p)); + } else if (compare(p.getNonce(), best.priority.getNonce()) > 0) { + LOG.info("Found a better connection"); + bestConnections.put(k, new Value(conn, p)); + best.connection.interruptOutgoingSession(); + } + LOG.info("Already have a better connection"); + conn.interruptOutgoingSession(); + } + } + + @Override + public void removeConnection(ContactId c, TransportId t, + DuplexSyncConnection conn) { + synchronized (lock) { + Key k = new Key(c, t); + Value best = bestConnections.get(k); + if (best.connection == conn) bestConnections.remove(k); + } + } + + private static class Key { + + private final ContactId contactId; + private final TransportId transportId; + + private Key(ContactId contactId, TransportId transportId) { + this.contactId = contactId; + this.transportId = transportId; + } + + @Override + public int hashCode() { + return contactId.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof Key) { + Key k = (Key) o; + return contactId.equals(k.contactId) && + transportId.equals(k.transportId); + } else { + return false; + } + } + } + + private static class Value { + + private final DuplexSyncConnection connection; + private final Priority priority; + + private Value(DuplexSyncConnection connection, Priority priority) { + this.connection = connection; + this.priority = priority; + } + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java index 7df9b1074..71914cb3c 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java @@ -18,6 +18,7 @@ import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamReaderFactory; import org.briarproject.bramble.api.transport.StreamWriterFactory; +import java.security.SecureRandom; import java.util.concurrent.Executor; import javax.annotation.concurrent.Immutable; @@ -36,6 +37,8 @@ class ConnectionManagerImpl implements ConnectionManager { private final ContactExchangeManager contactExchangeManager; private final ConnectionRegistry connectionRegistry; private final TransportPropertyManager transportPropertyManager; + private final ConnectionChooser connectionChooser; + private final SecureRandom secureRandom; @Inject ConnectionManagerImpl(@IoExecutor Executor ioExecutor, @@ -45,7 +48,8 @@ class ConnectionManagerImpl implements ConnectionManager { HandshakeManager handshakeManager, ContactExchangeManager contactExchangeManager, ConnectionRegistry connectionRegistry, - TransportPropertyManager transportPropertyManager) { + TransportPropertyManager transportPropertyManager, + ConnectionChooser connectionChooser, SecureRandom secureRandom) { this.ioExecutor = ioExecutor; this.keyManager = keyManager; this.streamReaderFactory = streamReaderFactory; @@ -55,6 +59,8 @@ class ConnectionManagerImpl implements ConnectionManager { this.contactExchangeManager = contactExchangeManager; this.connectionRegistry = connectionRegistry; this.transportPropertyManager = transportPropertyManager; + this.connectionChooser = connectionChooser; + this.secureRandom = secureRandom; } @@ -72,7 +78,7 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new IncomingDuplexSyncConnection(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager, ioExecutor, - t, d)); + connectionChooser, t, d)); } @Override @@ -97,7 +103,7 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new OutgoingDuplexSyncConnection(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager, ioExecutor, - c, t, d)); + connectionChooser, secureRandom, c, t, d)); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionModule.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionModule.java index e6b8fee3a..08f299352 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionModule.java @@ -23,4 +23,11 @@ public class ConnectionModule { ConnectionRegistryImpl connectionRegistry) { return connectionRegistry; } + + @Provides + @Singleton + ConnectionChooser provideConnectionChooser( + ConnectionChooserImpl connectionChooser) { + return connectionChooser; + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java index 55be56812..2f5415d26 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java @@ -9,6 +9,7 @@ import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.properties.TransportProperties; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -29,6 +30,7 @@ import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; abstract class DuplexSyncConnection extends SyncConnection { final Executor ioExecutor; + final ConnectionChooser connectionChooser; final TransportId transportId; final TransportConnectionReader reader; final TransportConnectionWriter writer; @@ -65,12 +67,13 @@ abstract class DuplexSyncConnection extends SyncConnection { StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, TransportPropertyManager transportPropertyManager, - Executor ioExecutor, TransportId transportId, - DuplexTransportConnection connection) { + Executor ioExecutor, ConnectionChooser connectionChooser, + TransportId transportId, DuplexTransportConnection connection) { super(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager); this.ioExecutor = ioExecutor; + this.connectionChooser = connectionChooser; this.transportId = transportId; reader = connection.getReader(); writer = connection.getWriter(); @@ -89,11 +92,12 @@ abstract class DuplexSyncConnection extends SyncConnection { } SyncSession createDuplexOutgoingSession(StreamContext ctx, - TransportConnectionWriter w) throws IOException { + TransportConnectionWriter w, @Nullable Priority priority) + throws IOException { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); ContactId c = requireNonNull(ctx.getContactId()); return syncSessionFactory.createDuplexOutgoingSession(c, - w.getMaxLatency(), w.getMaxIdleTime(), streamWriter); + w.getMaxLatency(), w.getMaxIdleTime(), streamWriter, priority); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java index ba780cbc5..5b72532e2 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java @@ -7,6 +7,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -30,11 +31,12 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, TransportPropertyManager transportPropertyManager, - Executor ioExecutor, TransportId transportId, - DuplexTransportConnection connection) { + Executor ioExecutor, ConnectionChooser connectionChooser, + TransportId transportId, DuplexTransportConnection connection) { super(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, - transportPropertyManager, ioExecutor, transportId, connection); + transportPropertyManager, ioExecutor, connectionChooser, + transportId, connection); } @Override @@ -65,8 +67,11 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection // Store any transport properties discovered from the connection transportPropertyManager.addRemotePropertiesFromConnection( contactId, transportId, remote); + // Add the connection to the chooser when we receive its priority + PriorityHandler handler = p -> connectionChooser.addConnection( + contactId, transportId, this, p); // Create and run the incoming session - createIncomingSession(ctx, reader).run(); + createIncomingSession(ctx, reader, handler).run(); reader.dispose(false, true); interruptOutgoingSession(); } catch (DbException | IOException e) { @@ -75,6 +80,7 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection } finally { connectionRegistry.unregisterConnection(contactId, transportId, true); + connectionChooser.removeConnection(contactId, transportId, this); } } @@ -88,7 +94,7 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection } try { // Create and run the outgoing session - SyncSession out = createDuplexOutgoingSession(ctx, writer); + SyncSession out = createDuplexOutgoingSession(ctx, writer, null); setOutgoingSession(out); out.run(); writer.dispose(false); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java index 9bf5980f8..2644d8cf4 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java @@ -6,6 +6,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportConnectionReader; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamContext; @@ -60,8 +61,11 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable { } connectionRegistry.registerConnection(contactId, transportId, true); try { + // We don't expect to receive a priority for this connection + PriorityHandler handler = p -> + LOG.info("Ignoring priority for simplex connection"); // Create and run the incoming session - createIncomingSession(ctx, reader).run(); + createIncomingSession(ctx, reader, handler).run(); reader.dispose(false, true); } catch (IOException e) { logException(LOG, WARNING, e); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java index ed6d21d05..0da1c4a18 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java @@ -7,6 +7,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.Priority; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -15,15 +17,18 @@ import org.briarproject.bramble.api.transport.StreamReaderFactory; import org.briarproject.bramble.api.transport.StreamWriterFactory; import java.io.IOException; +import java.security.SecureRandom; import java.util.concurrent.Executor; import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.api.sync.SyncConstants.PRIORITY_NONCE_BYTES; import static org.briarproject.bramble.util.LogUtils.logException; @NotNullByDefault class OutgoingDuplexSyncConnection extends DuplexSyncConnection implements Runnable { + private final SecureRandom secureRandom; private final ContactId contactId; OutgoingDuplexSyncConnection(KeyManager keyManager, @@ -32,11 +37,14 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, TransportPropertyManager transportPropertyManager, - Executor ioExecutor, ContactId contactId, TransportId transportId, - DuplexTransportConnection connection) { + Executor ioExecutor, ConnectionChooser connectionChooser, + SecureRandom secureRandom, ContactId contactId, + TransportId transportId, DuplexTransportConnection connection) { super(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, - transportPropertyManager, ioExecutor, transportId, connection); + transportPropertyManager, ioExecutor, connectionChooser, + transportId, connection); + this.secureRandom = secureRandom; this.contactId = contactId; } @@ -56,10 +64,12 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection return; } // Start the incoming session on another thread - ioExecutor.execute(this::runIncomingSession); + Priority priority = generatePriority(); + ioExecutor.execute(() -> runIncomingSession(priority)); try { // Create and run the outgoing session - SyncSession out = createDuplexOutgoingSession(ctx, writer); + SyncSession out = + createDuplexOutgoingSession(ctx, writer, priority); setOutgoingSession(out); out.run(); writer.dispose(false); @@ -69,7 +79,7 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection } } - private void runIncomingSession() { + private void runIncomingSession(Priority priority) { // Read and recognise the tag StreamContext ctx = recogniseTag(reader, transportId); // Unrecognised tags are suspicious in this case @@ -97,12 +107,16 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection return; } connectionRegistry.registerConnection(contactId, transportId, false); + connectionChooser.addConnection(contactId, transportId, this, priority); try { // Store any transport properties discovered from the connection transportPropertyManager.addRemotePropertiesFromConnection( contactId, transportId, remote); + // We don't expect to receive a priority for this connection + PriorityHandler handler = p -> + LOG.info("Ignoring priority for outgoing connection"); // Create and run the incoming session - createIncomingSession(ctx, reader).run(); + createIncomingSession(ctx, reader, handler).run(); reader.dispose(false, true); interruptOutgoingSession(); } catch (DbException | IOException e) { @@ -111,6 +125,7 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection } finally { connectionRegistry.unregisterConnection(contactId, transportId, false); + connectionChooser.removeConnection(contactId, transportId, this); } } @@ -118,4 +133,10 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection // 'Recognised' is always true for outgoing connections onReadError(true); } + + private Priority generatePriority() { + byte[] nonce = new byte[PRIORITY_NONCE_BYTES]; + secureRandom.nextBytes(nonce); + return new Priority(nonce); + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/SyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/SyncConnection.java index 40ac2cac2..6de535552 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/SyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/SyncConnection.java @@ -7,6 +7,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportConnectionReader; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -52,10 +53,12 @@ class SyncConnection extends Connection { } SyncSession createIncomingSession(StreamContext ctx, - TransportConnectionReader r) throws IOException { + TransportConnectionReader r, PriorityHandler handler) + throws IOException { InputStream streamReader = streamReaderFactory.createStreamReader( r.getInputStream(), ctx); ContactId c = requireNonNull(ctx.getContactId()); - return syncSessionFactory.createIncomingSession(c, streamReader); + return syncSessionFactory + .createIncomingSession(c, streamReader, handler); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index 0aa1a8985..fc007cd0f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -14,6 +14,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Offer; +import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; @@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -74,6 +76,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { private final int maxLatency, maxIdleTime; private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; + @Nullable + private final Priority priority; private final BlockingQueue> writerTasks; private final AtomicBoolean generateAckQueued = new AtomicBoolean(false); @@ -88,7 +92,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, EventBus eventBus, Clock clock, ContactId contactId, int maxLatency, int maxIdleTime, StreamWriter streamWriter, - SyncRecordWriter recordWriter) { + SyncRecordWriter recordWriter, @Nullable Priority priority) { this.db = db; this.dbExecutor = dbExecutor; this.eventBus = eventBus; @@ -98,6 +102,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { this.maxIdleTime = maxIdleTime; this.streamWriter = streamWriter; this.recordWriter = recordWriter; + this.priority = priority; writerTasks = new LinkedBlockingQueue<>(); } @@ -108,6 +113,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { try { // Send our supported protocol versions recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); + // Send our connection priority, if this is an outgoing connection + if (priority != null) recordWriter.writePriority(priority); // Start a query for each type of record generateAck(); generateBatch(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java index e9c17577e..6414b0490 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java @@ -15,6 +15,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Offer; +import org.briarproject.bramble.api.sync.Priority; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.SyncRecordReader; import org.briarproject.bramble.api.sync.SyncSession; @@ -47,17 +49,19 @@ class IncomingSession implements SyncSession, EventListener { private final EventBus eventBus; private final ContactId contactId; private final SyncRecordReader recordReader; + private final PriorityHandler priorityHandler; private volatile boolean interrupted = false; IncomingSession(DatabaseComponent db, Executor dbExecutor, EventBus eventBus, ContactId contactId, - SyncRecordReader recordReader) { + SyncRecordReader recordReader, PriorityHandler priorityHandler) { this.db = db; this.dbExecutor = dbExecutor; this.eventBus = eventBus; this.contactId = contactId; this.recordReader = recordReader; + this.priorityHandler = priorityHandler; } @IoExecutor @@ -86,6 +90,9 @@ class IncomingSession implements SyncSession, EventListener { } else if (recordReader.hasVersions()) { Versions v = recordReader.readVersions(); dbExecutor.execute(new ReceiveVersions(v)); + } else if (recordReader.hasPriority()) { + Priority p = recordReader.readPriority(); + priorityHandler.handle(p); } else { // unknown records are ignored in RecordReader#eof() throw new FormatException(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index d35e1164b..64bf25c5d 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -5,6 +5,8 @@ import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.Priority; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncRecordReader; import org.briarproject.bramble.api.sync.SyncRecordReaderFactory; import org.briarproject.bramble.api.sync.SyncRecordWriter; @@ -18,6 +20,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.Executor; +import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import javax.inject.Inject; @@ -46,10 +49,12 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { } @Override - public SyncSession createIncomingSession(ContactId c, InputStream in) { + public SyncSession createIncomingSession(ContactId c, InputStream in, + PriorityHandler handler) { SyncRecordReader recordReader = recordReaderFactory.createRecordReader(in); - return new IncomingSession(db, dbExecutor, eventBus, c, recordReader); + return new IncomingSession(db, dbExecutor, eventBus, c, recordReader, + handler); } @Override @@ -64,11 +69,12 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { @Override public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, - int maxIdleTime, StreamWriter streamWriter) { + int maxIdleTime, StreamWriter streamWriter, + @Nullable Priority priority) { OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, - maxLatency, maxIdleTime, streamWriter, recordWriter); + maxLatency, maxIdleTime, streamWriter, recordWriter, priority); } }