diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionRegistry.java b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionRegistry.java index 715a4da9e..d23b0144a 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionRegistry.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionRegistry.java @@ -3,6 +3,7 @@ package org.briarproject.bramble.api.connection; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.PendingContactId; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.PluginConfig; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent; import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent; @@ -10,6 +11,7 @@ import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent; import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent; +import org.briarproject.bramble.api.sync.Priority; import java.util.Collection; @@ -20,26 +22,86 @@ import java.util.Collection; public interface ConnectionRegistry { /** - * Registers a connection with the given contact over the given transport. + * Registers an incoming connection from the given contact over the given + * transport. The connection's {@link Priority priority} can be set later + * via {@link #setPriority(ContactId, TransportId, InterruptibleConnection, + * Priority)} if a priority record is received from the contact. + *

* Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts * {@link ContactConnectedEvent} if this is the only connection with the * contact. */ - void registerConnection(ContactId c, TransportId t, boolean incoming); + void registerIncomingConnection(ContactId c, TransportId t, + InterruptibleConnection conn); + + /** + * Registers an outgoing connection to the given contact over the given + * transport. + *

+ * Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts + * {@link ContactConnectedEvent} if this is the only connection with the + * contact. + *

+ * If the registry has any "better" connections with the given contact, the + * given connection will be interrupted. If the registry has any "worse" + * connections with the given contact, those connections will be + * interrupted. + *

+ * Connection A is considered "better" than connection B if both + * connections have had their priorities set, and either A's transport is + * {@link PluginConfig#getTransportPreferences() preferred} to B's, or + * they use the same transport and A has higher {@link Priority priority} + * than B. + *

+ * For backward compatibility, connections without priorities are not + * considered better or worse than other connections. + */ + void registerOutgoingConnection(ContactId c, TransportId t, + InterruptibleConnection conn, Priority priority); /** * Unregisters a connection with the given contact over the given transport. + *

* Broadcasts {@link ConnectionClosedEvent}. Also broadcasts * {@link ContactDisconnectedEvent} if this is the only connection with * the contact. */ - void unregisterConnection(ContactId c, TransportId t, boolean incoming); + void unregisterConnection(ContactId c, TransportId t, + InterruptibleConnection conn, boolean incoming, boolean exception); + + /** + * Sets the {@link Priority priority} of a connection that was previously + * registered via {@link #registerIncomingConnection(ContactId, TransportId, + * InterruptibleConnection)}. + *

+ * If the registry has any "better" connections with the given contact, the + * given connection will be interrupted. If the registry has any "worse" + * connections with the given contact, those connections will be + * interrupted. + *

+ * Connection A is considered "better" than connection B if both + * connections have had their priorities set, and either A's transport is + * {@link PluginConfig#getTransportPreferences() preferred} to B's, or + * they use the same transport and A has higher {@link Priority priority} + * than B. + *

+ * For backward compatibility, connections without priorities are not + * considered better or worse than other connections. + */ + void setPriority(ContactId c, TransportId t, InterruptibleConnection conn, + Priority priority); /** * Returns any contacts that are connected via the given transport. */ Collection getConnectedContacts(TransportId t); + /** + * Returns any contacts that are connected via the given transport or any + * {@link PluginConfig#getTransportPreferences() better} transport. + */ + Collection getConnectedOrBetterContacts(TransportId t); + /** * Returns true if the given contact is connected via the given transport. */ diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/connection/InterruptibleConnection.java b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/InterruptibleConnection.java new file mode 100644 index 000000000..59b46b1ec --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/InterruptibleConnection.java @@ -0,0 +1,19 @@ +package org.briarproject.bramble.api.connection; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +/** + * A duplex sync connection that can be closed by interrupting its outgoing + * sync session. + */ +@NotNullByDefault +public interface InterruptibleConnection { + + /** + * Interrupts the connection's outgoing sync session. If the underlying + * transport connection is alive and the remote peer is cooperative, this + * should result in both sync sessions ending and the connection being + * cleanly closed. + */ + void interruptOutgoingSession(); +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/PluginConfig.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/PluginConfig.java index 6bf13cedd..61063030c 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/PluginConfig.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/PluginConfig.java @@ -5,6 +5,8 @@ import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory; import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory; import java.util.Collection; +import java.util.List; +import java.util.Map; @NotNullByDefault public interface PluginConfig { @@ -14,4 +16,11 @@ public interface PluginConfig { Collection getSimplexFactories(); boolean shouldPoll(); + + /** + * Returns a map representing transport preferences. For each entry in the + * map, connections via the transports identified by the value are + * preferred to connections via the transport identified by the key. + */ + Map> getTransportPreferences(); } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/event/ConnectionClosedEvent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/event/ConnectionClosedEvent.java index 05c4cbdd7..6d5e5293b 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/event/ConnectionClosedEvent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/event/ConnectionClosedEvent.java @@ -13,13 +13,14 @@ public class ConnectionClosedEvent extends Event { private final ContactId contactId; private final TransportId transportId; - private final boolean incoming; + private final boolean incoming, exception; public ConnectionClosedEvent(ContactId contactId, TransportId transportId, - boolean incoming) { + boolean incoming, boolean exception) { this.contactId = contactId; this.transportId = transportId; this.incoming = incoming; + this.exception = exception; } public ContactId getContactId() { @@ -33,4 +34,8 @@ public class ConnectionClosedEvent extends Event { public boolean isIncoming() { return incoming; } + + public boolean isException() { + return exception; + } } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/Priority.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/Priority.java new file mode 100644 index 000000000..44a2169b7 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/Priority.java @@ -0,0 +1,23 @@ +package org.briarproject.bramble.api.sync; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +import javax.annotation.concurrent.Immutable; + +/** + * A record containing a nonce for choosing between redundant sessions. + */ +@Immutable +@NotNullByDefault +public class Priority { + + private final byte[] nonce; + + public Priority(byte[] nonce) { + this.nonce = nonce; + } + + public byte[] getNonce() { + return nonce; + } +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/PriorityHandler.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/PriorityHandler.java new file mode 100644 index 000000000..57b2bcf98 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/PriorityHandler.java @@ -0,0 +1,13 @@ +package org.briarproject.bramble.api.sync; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +/** + * An interface for handling a {@link Priority} record received by an + * incoming {@link SyncSession}. + */ +@NotNullByDefault +public interface PriorityHandler { + + void handle(Priority p); +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/RecordTypes.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/RecordTypes.java index 168b4c9ef..7fdbe0af7 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/RecordTypes.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/RecordTypes.java @@ -10,4 +10,5 @@ public interface RecordTypes { byte OFFER = 2; byte REQUEST = 3; byte VERSIONS = 4; + byte PRIORITY = 5; } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java index c430ac05f..2e9241bed 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java @@ -49,4 +49,10 @@ public interface SyncConstants { * simultaneously. */ int MAX_SUPPORTED_VERSIONS = 10; + + /** + * The length of the priority nonce used for choosing between redundant + * connections. + */ + int PRIORITY_NONCE_BYTES = 16; } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordReader.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordReader.java index 55650e407..c0f9b947a 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordReader.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordReader.java @@ -28,4 +28,8 @@ public interface SyncRecordReader { boolean hasVersions() throws IOException; Versions readVersions() throws IOException; + + boolean hasPriority() throws IOException; + + Priority readPriority() throws IOException; } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java index bdeca0cf8..75d4b8401 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java @@ -17,5 +17,7 @@ public interface SyncRecordWriter { void writeVersions(Versions v) throws IOException; + void writePriority(Priority p) throws IOException; + void flush() throws IOException; } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java index 216f29398..2107c9a1f 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java @@ -6,14 +6,18 @@ import org.briarproject.bramble.api.transport.StreamWriter; import java.io.InputStream; +import javax.annotation.Nullable; + @NotNullByDefault public interface SyncSessionFactory { - SyncSession createIncomingSession(ContactId c, InputStream in); + SyncSession createIncomingSession(ContactId c, InputStream in, + PriorityHandler handler); SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency, StreamWriter streamWriter); SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, - int maxIdleTime, StreamWriter streamWriter); + int maxIdleTime, StreamWriter streamWriter, + @Nullable Priority priority); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java index 7df9b1074..2a50033b1 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java @@ -18,6 +18,7 @@ import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamReaderFactory; import org.briarproject.bramble.api.transport.StreamWriterFactory; +import java.security.SecureRandom; import java.util.concurrent.Executor; import javax.annotation.concurrent.Immutable; @@ -36,6 +37,7 @@ class ConnectionManagerImpl implements ConnectionManager { private final ContactExchangeManager contactExchangeManager; private final ConnectionRegistry connectionRegistry; private final TransportPropertyManager transportPropertyManager; + private final SecureRandom secureRandom; @Inject ConnectionManagerImpl(@IoExecutor Executor ioExecutor, @@ -45,7 +47,8 @@ class ConnectionManagerImpl implements ConnectionManager { HandshakeManager handshakeManager, ContactExchangeManager contactExchangeManager, ConnectionRegistry connectionRegistry, - TransportPropertyManager transportPropertyManager) { + TransportPropertyManager transportPropertyManager, + SecureRandom secureRandom) { this.ioExecutor = ioExecutor; this.keyManager = keyManager; this.streamReaderFactory = streamReaderFactory; @@ -55,6 +58,7 @@ class ConnectionManagerImpl implements ConnectionManager { this.contactExchangeManager = contactExchangeManager; this.connectionRegistry = connectionRegistry; this.transportPropertyManager = transportPropertyManager; + this.secureRandom = secureRandom; } @@ -97,7 +101,7 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new OutgoingDuplexSyncConnection(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager, ioExecutor, - c, t, d)); + secureRandom, c, t, d)); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionRegistryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionRegistryImpl.java index 1bba22593..bf33c7387 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionRegistryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionRegistryImpl.java @@ -1,11 +1,13 @@ package org.briarproject.bramble.connection; -import org.briarproject.bramble.api.Multiset; +import org.briarproject.bramble.api.Bytes; import org.briarproject.bramble.api.connection.ConnectionRegistry; +import org.briarproject.bramble.api.connection.InterruptibleConnection; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.PendingContactId; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.PluginConfig; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent; import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent; @@ -13,21 +15,24 @@ import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent; import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent; +import org.briarproject.bramble.api.sync.Priority; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.logging.Logger; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; +import static java.util.Collections.emptyList; import static java.util.logging.Level.INFO; import static java.util.logging.Logger.getLogger; @@ -39,39 +44,56 @@ class ConnectionRegistryImpl implements ConnectionRegistry { getLogger(ConnectionRegistryImpl.class.getName()); private final EventBus eventBus; + private final Map> transportPrefs; private final Object lock = new Object(); @GuardedBy("lock") - private final Map> contactConnections; - @GuardedBy("lock") - private final Multiset contactCounts; + private final Map> contactConnections; @GuardedBy("lock") private final Set connectedPendingContacts; @Inject - ConnectionRegistryImpl(EventBus eventBus) { + ConnectionRegistryImpl(EventBus eventBus, PluginConfig pluginConfig) { this.eventBus = eventBus; + transportPrefs = pluginConfig.getTransportPreferences(); contactConnections = new HashMap<>(); - contactCounts = new Multiset<>(); connectedPendingContacts = new HashSet<>(); } @Override - public void registerConnection(ContactId c, TransportId t, - boolean incoming) { + public void registerIncomingConnection(ContactId c, TransportId t, + InterruptibleConnection conn) { + if (LOG.isLoggable(INFO)) { + LOG.info("Incoming connection registered: " + t); + } + registerConnection(c, t, conn, true); + } + + @Override + public void registerOutgoingConnection(ContactId c, TransportId t, + InterruptibleConnection conn, Priority priority) { + if (LOG.isLoggable(INFO)) { + LOG.info("Outgoing connection registered: " + t); + } + registerConnection(c, t, conn, false); + setPriority(c, t, conn, priority); + } + + private void registerConnection(ContactId c, TransportId t, + InterruptibleConnection conn, boolean incoming) { if (LOG.isLoggable(INFO)) { if (incoming) LOG.info("Incoming connection registered: " + t); else LOG.info("Outgoing connection registered: " + t); } - boolean firstConnection = false; + boolean firstConnection; synchronized (lock) { - Multiset m = contactConnections.get(t); - if (m == null) { - m = new Multiset<>(); - contactConnections.put(t, m); + List recs = contactConnections.get(c); + if (recs == null) { + recs = new ArrayList<>(); + contactConnections.put(c, recs); } - m.add(c); - if (contactCounts.add(c) == 1) firstConnection = true; + firstConnection = recs.isEmpty(); + recs.add(new ConnectionRecord(t, conn)); } eventBus.broadcast(new ConnectionOpenedEvent(c, t, incoming)); if (firstConnection) { @@ -80,22 +102,72 @@ class ConnectionRegistryImpl implements ConnectionRegistry { } } + @Override + public void setPriority(ContactId c, TransportId t, + InterruptibleConnection conn, Priority priority) { + if (LOG.isLoggable(INFO)) LOG.info("Setting connection priority: " + t); + List toInterrupt; + boolean interruptNewConnection = false; + synchronized (lock) { + List recs = contactConnections.get(c); + if (recs == null) throw new IllegalArgumentException(); + toInterrupt = new ArrayList<>(recs.size()); + for (ConnectionRecord rec : recs) { + if (rec.conn == conn) { + // Store the priority of this connection + rec.priority = priority; + } else if (rec.priority != null) { + int compare = compareConnections(t, priority, + rec.transportId, rec.priority); + if (compare == -1) { + // The old connection is better than the new one + interruptNewConnection = true; + } else if (compare == 1 && !rec.interrupted) { + // The new connection is better than the old one + toInterrupt.add(rec.conn); + rec.interrupted = true; + } + } + } + } + if (interruptNewConnection) { + LOG.info("Interrupting new connection"); + conn.interruptOutgoingSession(); + } + for (InterruptibleConnection old : toInterrupt) { + LOG.info("Interrupting old connection"); + old.interruptOutgoingSession(); + } + } + + private int compareConnections(TransportId tA, Priority pA, TransportId tB, + Priority pB) { + if (getBetterTransports(tA).contains(tB)) return -1; + if (getBetterTransports(tB).contains(tA)) return 1; + return tA.equals(tB) ? Bytes.compare(pA.getNonce(), pB.getNonce()) : 0; + } + + private List getBetterTransports(TransportId t) { + List better = transportPrefs.get(t); + return better == null ? emptyList() : better; + } + @Override public void unregisterConnection(ContactId c, TransportId t, - boolean incoming) { + InterruptibleConnection conn, boolean incoming, boolean exception) { if (LOG.isLoggable(INFO)) { if (incoming) LOG.info("Incoming connection unregistered: " + t); else LOG.info("Outgoing connection unregistered: " + t); } - boolean lastConnection = false; + boolean lastConnection; synchronized (lock) { - Multiset m = contactConnections.get(t); - if (m == null || !m.contains(c)) + List recs = contactConnections.get(c); + if (recs == null || !recs.remove(new ConnectionRecord(t, conn))) throw new IllegalArgumentException(); - m.remove(c); - if (contactCounts.remove(c) == 0) lastConnection = true; + lastConnection = recs.isEmpty(); } - eventBus.broadcast(new ConnectionClosedEvent(c, t, incoming)); + eventBus.broadcast( + new ConnectionClosedEvent(c, t, incoming, exception)); if (lastConnection) { LOG.info("Contact disconnected"); eventBus.broadcast(new ContactDisconnectedEvent(c)); @@ -105,27 +177,63 @@ class ConnectionRegistryImpl implements ConnectionRegistry { @Override public Collection getConnectedContacts(TransportId t) { synchronized (lock) { - Multiset m = contactConnections.get(t); - if (m == null) return Collections.emptyList(); - List ids = new ArrayList<>(m.keySet()); - if (LOG.isLoggable(INFO)) - LOG.info(ids.size() + " contacts connected: " + t); - return ids; + List contactIds = new ArrayList<>(); + for (Entry> e : + contactConnections.entrySet()) { + for (ConnectionRecord rec : e.getValue()) { + if (rec.transportId.equals(t)) { + contactIds.add(e.getKey()); + break; + } + } + } + if (LOG.isLoggable(INFO)) { + LOG.info(contactIds.size() + " contacts connected: " + t); + } + return contactIds; + } + } + + @Override + public Collection getConnectedOrBetterContacts(TransportId t) { + synchronized (lock) { + List better = getBetterTransports(t); + List contactIds = new ArrayList<>(); + for (Entry> e : + contactConnections.entrySet()) { + for (ConnectionRecord rec : e.getValue()) { + if (rec.transportId.equals(t) || + better.contains(rec.transportId)) { + contactIds.add(e.getKey()); + break; + } + } + } + if (LOG.isLoggable(INFO)) { + LOG.info(contactIds.size() + + " contacts connected or better: " + t); + } + return contactIds; } } @Override public boolean isConnected(ContactId c, TransportId t) { synchronized (lock) { - Multiset m = contactConnections.get(t); - return m != null && m.contains(c); + List recs = contactConnections.get(c); + if (recs == null) return false; + for (ConnectionRecord rec : recs) { + if (rec.transportId.equals(t)) return true; + } + return false; } } @Override public boolean isConnected(ContactId c) { synchronized (lock) { - return contactCounts.contains(c); + List recs = contactConnections.get(c); + return recs != null && !recs.isEmpty(); } } @@ -147,4 +255,35 @@ class ConnectionRegistryImpl implements ConnectionRegistry { } eventBus.broadcast(new RendezvousConnectionClosedEvent(p, success)); } + + private static class ConnectionRecord { + + private final TransportId transportId; + private final InterruptibleConnection conn; + @GuardedBy("lock") + @Nullable + private Priority priority = null; + @GuardedBy("lock") + private boolean interrupted = false; + + private ConnectionRecord(TransportId transportId, + InterruptibleConnection conn) { + this.transportId = transportId; + this.conn = conn; + } + + @Override + public boolean equals(Object o) { + if (o instanceof ConnectionRecord) { + return conn == ((ConnectionRecord) o).conn; + } else { + return false; + } + } + + @Override + public int hashCode() { + return conn.hashCode(); + } + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java index 0ed62ab0b..6053b10c9 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java @@ -1,6 +1,7 @@ package org.briarproject.bramble.connection; import org.briarproject.bramble.api.connection.ConnectionRegistry; +import org.briarproject.bramble.api.connection.InterruptibleConnection; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportConnectionReader; @@ -9,6 +10,7 @@ import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.properties.TransportProperties; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -21,11 +23,13 @@ import java.io.IOException; import java.util.concurrent.Executor; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; @NotNullByDefault -abstract class DuplexSyncConnection extends SyncConnection { +abstract class DuplexSyncConnection extends SyncConnection + implements InterruptibleConnection { final Executor ioExecutor; final TransportId transportId; @@ -33,8 +37,31 @@ abstract class DuplexSyncConnection extends SyncConnection { final TransportConnectionWriter writer; final TransportProperties remote; + private final Object interruptLock = new Object(); + + @GuardedBy("interruptLock") @Nullable - volatile SyncSession outgoingSession = null; + private SyncSession outgoingSession = null; + @GuardedBy("interruptLock") + private boolean interruptWaiting = false; + + @Override + public void interruptOutgoingSession() { + synchronized (interruptLock) { + if (outgoingSession == null) interruptWaiting = true; + else outgoingSession.interrupt(); + } + } + + void setOutgoingSession(SyncSession outgoingSession) { + synchronized (interruptLock) { + this.outgoingSession = outgoingSession; + if (interruptWaiting) { + outgoingSession.interrupt(); + interruptWaiting = false; + } + } + } DuplexSyncConnection(KeyManager keyManager, ConnectionRegistry connectionRegistry, @@ -57,9 +84,7 @@ abstract class DuplexSyncConnection extends SyncConnection { void onReadError(boolean recognised) { disposeOnError(reader, recognised); disposeOnError(writer); - // Interrupt the outgoing session so it finishes - SyncSession out = outgoingSession; - if (out != null) out.interrupt(); + interruptOutgoingSession(); } void onWriteError() { @@ -68,11 +93,12 @@ abstract class DuplexSyncConnection extends SyncConnection { } SyncSession createDuplexOutgoingSession(StreamContext ctx, - TransportConnectionWriter w) throws IOException { + TransportConnectionWriter w, @Nullable Priority priority) + throws IOException { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); ContactId c = requireNonNull(ctx.getContactId()); return syncSessionFactory.createDuplexOutgoingSession(c, - w.getMaxLatency(), w.getMaxIdleTime(), streamWriter); + w.getMaxLatency(), w.getMaxIdleTime(), streamWriter, priority); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java index 382b17a50..abf34ca0f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java @@ -7,6 +7,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -58,25 +59,28 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection onReadError(true); return; } - connectionRegistry.registerConnection(contactId, transportId, true); + connectionRegistry.registerIncomingConnection(contactId, transportId, + this); // Start the outgoing session on another thread ioExecutor.execute(() -> runOutgoingSession(contactId)); try { // Store any transport properties discovered from the connection transportPropertyManager.addRemotePropertiesFromConnection( contactId, transportId, remote); + // Update the connection registry when we receive our priority + PriorityHandler handler = p -> connectionRegistry.setPriority( + contactId, transportId, this, p); // Create and run the incoming session - createIncomingSession(ctx, reader).run(); + createIncomingSession(ctx, reader, handler).run(); reader.dispose(false, true); - // Interrupt the outgoing session so it finishes cleanly - SyncSession out = outgoingSession; - if (out != null) out.interrupt(); + interruptOutgoingSession(); + connectionRegistry.unregisterConnection(contactId, transportId, + this, true, false); } catch (DbException | IOException e) { logException(LOG, WARNING, e); onReadError(true); - } finally { connectionRegistry.unregisterConnection(contactId, transportId, - true); + this, true, true); } } @@ -90,8 +94,8 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection } try { // Create and run the outgoing session - SyncSession out = createDuplexOutgoingSession(ctx, writer); - outgoingSession = out; + SyncSession out = createDuplexOutgoingSession(ctx, writer, null); + setOutgoingSession(out); out.run(); writer.dispose(false); } catch (IOException e) { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java index 9bf5980f8..b41fb33e0 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java @@ -6,6 +6,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportConnectionReader; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamContext; @@ -58,17 +59,16 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable { onError(true); return; } - connectionRegistry.registerConnection(contactId, transportId, true); try { + // We don't expect to receive a priority for this connection + PriorityHandler handler = p -> + LOG.info("Ignoring priority for simplex connection"); // Create and run the incoming session - createIncomingSession(ctx, reader).run(); + createIncomingSession(ctx, reader, handler).run(); reader.dispose(false, true); } catch (IOException e) { logException(LOG, WARNING, e); onError(true); - } finally { - connectionRegistry.unregisterConnection(contactId, transportId, - true); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java index 0c8f464fb..d7f777b7b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java @@ -7,6 +7,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.Priority; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -15,15 +17,18 @@ import org.briarproject.bramble.api.transport.StreamReaderFactory; import org.briarproject.bramble.api.transport.StreamWriterFactory; import java.io.IOException; +import java.security.SecureRandom; import java.util.concurrent.Executor; import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.api.sync.SyncConstants.PRIORITY_NONCE_BYTES; import static org.briarproject.bramble.util.LogUtils.logException; @NotNullByDefault class OutgoingDuplexSyncConnection extends DuplexSyncConnection implements Runnable { + private final SecureRandom secureRandom; private final ContactId contactId; OutgoingDuplexSyncConnection(KeyManager keyManager, @@ -32,11 +37,12 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, TransportPropertyManager transportPropertyManager, - Executor ioExecutor, ContactId contactId, TransportId transportId, - DuplexTransportConnection connection) { + Executor ioExecutor, SecureRandom secureRandom, ContactId contactId, + TransportId transportId, DuplexTransportConnection connection) { super(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager, ioExecutor, transportId, connection); + this.secureRandom = secureRandom; this.contactId = contactId; } @@ -56,11 +62,13 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection return; } // Start the incoming session on another thread - ioExecutor.execute(this::runIncomingSession); + Priority priority = generatePriority(); + ioExecutor.execute(() -> runIncomingSession(priority)); try { // Create and run the outgoing session - SyncSession out = createDuplexOutgoingSession(ctx, writer); - outgoingSession = out; + SyncSession out = + createDuplexOutgoingSession(ctx, writer, priority); + setOutgoingSession(out); out.run(); writer.dispose(false); } catch (IOException e) { @@ -69,7 +77,7 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection } } - private void runIncomingSession() { + private void runIncomingSession(Priority priority) { // Read and recognise the tag StreamContext ctx = recogniseTag(reader, transportId); // Unrecognised tags are suspicious in this case @@ -96,23 +104,26 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection onReadError(); return; } - connectionRegistry.registerConnection(contactId, transportId, false); + connectionRegistry.registerOutgoingConnection(contactId, transportId, + this, priority); try { // Store any transport properties discovered from the connection transportPropertyManager.addRemotePropertiesFromConnection( contactId, transportId, remote); + // We don't expect to receive a priority for this connection + PriorityHandler handler = p -> + LOG.info("Ignoring priority for outgoing connection"); // Create and run the incoming session - createIncomingSession(ctx, reader).run(); + createIncomingSession(ctx, reader, handler).run(); reader.dispose(false, true); - // Interrupt the outgoing session so it finishes cleanly - SyncSession out = outgoingSession; - if (out != null) out.interrupt(); + interruptOutgoingSession(); + connectionRegistry.unregisterConnection(contactId, transportId, + this, false, false); } catch (DbException | IOException e) { logException(LOG, WARNING, e); onReadError(); - } finally { connectionRegistry.unregisterConnection(contactId, transportId, - false); + this, false, true); } } @@ -120,4 +131,10 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection // 'Recognised' is always true for outgoing connections onReadError(true); } + + private Priority generatePriority() { + byte[] nonce = new byte[PRIORITY_NONCE_BYTES]; + secureRandom.nextBytes(nonce); + return new Priority(nonce); + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java index 7136c818f..d91d444c2 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java @@ -52,7 +52,6 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { onError(); return; } - connectionRegistry.registerConnection(contactId, transportId, false); try { // Create and run the outgoing session createSimplexOutgoingSession(ctx, writer).run(); @@ -60,9 +59,6 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { } catch (IOException e) { logException(LOG, WARNING, e); onError(); - } finally { - connectionRegistry.unregisterConnection(contactId, transportId, - false); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/SyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/SyncConnection.java index 40ac2cac2..6de535552 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/SyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/SyncConnection.java @@ -7,6 +7,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportConnectionReader; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -52,10 +53,12 @@ class SyncConnection extends Connection { } SyncSession createIncomingSession(StreamContext ctx, - TransportConnectionReader r) throws IOException { + TransportConnectionReader r, PriorityHandler handler) + throws IOException { InputStream streamReader = streamReaderFactory.createStreamReader( r.getInputStream(), ctx); ContactId c = requireNonNull(ctx.getContactId()); - return syncSessionFactory.createIncomingSession(c, streamReader); + return syncSessionFactory + .createIncomingSession(c, streamReader, handler); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java index c42e49a8c..1aeb66172 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java @@ -98,8 +98,8 @@ class PollerImpl implements Poller, EventListener { ConnectionClosedEvent c = (ConnectionClosedEvent) e; // Reschedule polling, the polling interval may have decreased reschedule(c.getTransportId()); - if (!c.isIncoming()) { - // Connect to the disconnected contact + // If an outgoing connection failed, try to reconnect + if (!c.isIncoming() && c.isException()) { connectToContact(c.getContactId(), c.getTransportId()); } } else if (e instanceof ConnectionOpenedEvent) { @@ -215,7 +215,7 @@ class PollerImpl implements Poller, EventListener { Map remote = transportPropertyManager.getRemoteProperties(t); Collection connected = - connectionRegistry.getConnectedContacts(t); + connectionRegistry.getConnectedOrBetterContacts(t); Collection> properties = new ArrayList<>(); for (Entry e : remote.entrySet()) { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index 0aa1a8985..fc007cd0f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -14,6 +14,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Offer; +import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; @@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -74,6 +76,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { private final int maxLatency, maxIdleTime; private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; + @Nullable + private final Priority priority; private final BlockingQueue> writerTasks; private final AtomicBoolean generateAckQueued = new AtomicBoolean(false); @@ -88,7 +92,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, EventBus eventBus, Clock clock, ContactId contactId, int maxLatency, int maxIdleTime, StreamWriter streamWriter, - SyncRecordWriter recordWriter) { + SyncRecordWriter recordWriter, @Nullable Priority priority) { this.db = db; this.dbExecutor = dbExecutor; this.eventBus = eventBus; @@ -98,6 +102,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { this.maxIdleTime = maxIdleTime; this.streamWriter = streamWriter; this.recordWriter = recordWriter; + this.priority = priority; writerTasks = new LinkedBlockingQueue<>(); } @@ -108,6 +113,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { try { // Send our supported protocol versions recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); + // Send our connection priority, if this is an outgoing connection + if (priority != null) recordWriter.writePriority(priority); // Start a query for each type of record generateAck(); generateBatch(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java index e9c17577e..6414b0490 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java @@ -15,6 +15,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Offer; +import org.briarproject.bramble.api.sync.Priority; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.SyncRecordReader; import org.briarproject.bramble.api.sync.SyncSession; @@ -47,17 +49,19 @@ class IncomingSession implements SyncSession, EventListener { private final EventBus eventBus; private final ContactId contactId; private final SyncRecordReader recordReader; + private final PriorityHandler priorityHandler; private volatile boolean interrupted = false; IncomingSession(DatabaseComponent db, Executor dbExecutor, EventBus eventBus, ContactId contactId, - SyncRecordReader recordReader) { + SyncRecordReader recordReader, PriorityHandler priorityHandler) { this.db = db; this.dbExecutor = dbExecutor; this.eventBus = eventBus; this.contactId = contactId; this.recordReader = recordReader; + this.priorityHandler = priorityHandler; } @IoExecutor @@ -86,6 +90,9 @@ class IncomingSession implements SyncSession, EventListener { } else if (recordReader.hasVersions()) { Versions v = recordReader.readVersions(); dbExecutor.execute(new ReceiveVersions(v)); + } else if (recordReader.hasPriority()) { + Priority p = recordReader.readPriority(); + priorityHandler.handle(p); } else { // unknown records are ignored in RecordReader#eof() throw new FormatException(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordReaderImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordReaderImpl.java index 7ea0f6a41..8d8857460 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordReaderImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordReaderImpl.java @@ -11,6 +11,7 @@ import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.MessageFactory; import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.Offer; +import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.SyncRecordReader; import org.briarproject.bramble.api.sync.Versions; @@ -26,10 +27,12 @@ import javax.annotation.concurrent.NotThreadSafe; import static org.briarproject.bramble.api.sync.RecordTypes.ACK; import static org.briarproject.bramble.api.sync.RecordTypes.MESSAGE; import static org.briarproject.bramble.api.sync.RecordTypes.OFFER; +import static org.briarproject.bramble.api.sync.RecordTypes.PRIORITY; import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST; import static org.briarproject.bramble.api.sync.RecordTypes.VERSIONS; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_SUPPORTED_VERSIONS; import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; +import static org.briarproject.bramble.api.sync.SyncConstants.PRIORITY_NONCE_BYTES; import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION; @NotThreadSafe @@ -48,7 +51,7 @@ class SyncRecordReaderImpl implements SyncRecordReader { private static boolean isKnownRecordType(byte type) { return type == ACK || type == MESSAGE || type == OFFER || - type == REQUEST || type == VERSIONS; + type == REQUEST || type == VERSIONS || type == PRIORITY; } private final MessageFactory messageFactory; @@ -174,4 +177,23 @@ class SyncRecordReaderImpl implements SyncRecordReader { nextRecord = null; return supported; } + + @Override + public boolean hasPriority() throws IOException { + return !eof() && getNextRecordType() == PRIORITY; + } + + @Override + public Priority readPriority() throws IOException { + if (!hasPriority()) throw new FormatException(); + return new Priority(readNonce()); + } + + private byte[] readNonce() throws IOException { + if (nextRecord == null) throw new AssertionError(); + byte[] payload = nextRecord.getPayload(); + if (payload.length != PRIORITY_NONCE_BYTES) throw new FormatException(); + nextRecord = null; + return payload; + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java index 118cc51cd..d2b4e73f1 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java @@ -8,6 +8,7 @@ import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.MessageFactory; import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.Offer; +import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.Versions; @@ -20,6 +21,7 @@ import javax.annotation.concurrent.NotThreadSafe; import static org.briarproject.bramble.api.sync.RecordTypes.ACK; import static org.briarproject.bramble.api.sync.RecordTypes.MESSAGE; import static org.briarproject.bramble.api.sync.RecordTypes.OFFER; +import static org.briarproject.bramble.api.sync.RecordTypes.PRIORITY; import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST; import static org.briarproject.bramble.api.sync.RecordTypes.VERSIONS; import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION; @@ -73,6 +75,12 @@ class SyncRecordWriterImpl implements SyncRecordWriter { writeRecord(VERSIONS); } + @Override + public void writePriority(Priority p) throws IOException { + writer.writeRecord( + new Record(PROTOCOL_VERSION, PRIORITY, p.getNonce())); + } + @Override public void flush() throws IOException { writer.flush(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index d35e1164b..64bf25c5d 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -5,6 +5,8 @@ import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.Priority; +import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncRecordReader; import org.briarproject.bramble.api.sync.SyncRecordReaderFactory; import org.briarproject.bramble.api.sync.SyncRecordWriter; @@ -18,6 +20,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.Executor; +import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import javax.inject.Inject; @@ -46,10 +49,12 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { } @Override - public SyncSession createIncomingSession(ContactId c, InputStream in) { + public SyncSession createIncomingSession(ContactId c, InputStream in, + PriorityHandler handler) { SyncRecordReader recordReader = recordReaderFactory.createRecordReader(in); - return new IncomingSession(db, dbExecutor, eventBus, c, recordReader); + return new IncomingSession(db, dbExecutor, eventBus, c, recordReader, + handler); } @Override @@ -64,11 +69,12 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { @Override public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, - int maxIdleTime, StreamWriter streamWriter) { + int maxIdleTime, StreamWriter streamWriter, + @Nullable Priority priority) { OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, - maxLatency, maxIdleTime, streamWriter, recordWriter); + maxLatency, maxIdleTime, streamWriter, recordWriter, priority); } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionRegistryImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionRegistryImplTest.java index 3edf4ddc8..c8b6de86d 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionRegistryImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionRegistryImplTest.java @@ -1,9 +1,11 @@ package org.briarproject.bramble.connection; import org.briarproject.bramble.api.connection.ConnectionRegistry; +import org.briarproject.bramble.api.connection.InterruptibleConnection; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.PendingContactId; import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.plugin.PluginConfig; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent; import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent; @@ -11,6 +13,7 @@ import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent; import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent; +import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.test.BrambleMockTestCase; import org.jmock.Expectations; import org.junit.Test; @@ -18,10 +21,13 @@ import org.junit.Test; import java.util.Collection; import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.briarproject.bramble.test.TestUtils.getTransportId; +import static org.briarproject.bramble.util.StringUtils.fromHexString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -30,21 +36,48 @@ import static org.junit.Assert.fail; public class ConnectionRegistryImplTest extends BrambleMockTestCase { private final EventBus eventBus = context.mock(EventBus.class); + private final PluginConfig pluginConfig = context.mock(PluginConfig.class); + private final InterruptibleConnection conn1 = + context.mock(InterruptibleConnection.class, "conn1"); + private final InterruptibleConnection conn2 = + context.mock(InterruptibleConnection.class, "conn2"); + private final InterruptibleConnection conn3 = + context.mock(InterruptibleConnection.class, "conn3"); - private final ContactId contactId = getContactId(); private final ContactId contactId1 = getContactId(); - private final TransportId transportId = getTransportId(); + private final ContactId contactId2 = getContactId(); private final TransportId transportId1 = getTransportId(); + private final TransportId transportId2 = getTransportId(); + private final TransportId transportId3 = getTransportId(); private final PendingContactId pendingContactId = new PendingContactId(getRandomId()); + private final Priority low = + new Priority(fromHexString("00000000000000000000000000000000")); + private final Priority high = + new Priority(fromHexString("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")); + @Test - public void testRegisterAndUnregister() { - ConnectionRegistry c = new ConnectionRegistryImpl(eventBus); + public void testRegisterMultipleConnections() { + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue(emptyMap())); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); // The registry should be empty - assertEquals(emptyList(), c.getConnectedContacts(transportId)); assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId1)); + assertEquals(emptyList(), c.getConnectedContacts(transportId2)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId2)); + assertEquals(emptyList(), c.getConnectedContacts(transportId3)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId3)); + assertFalse(c.isConnected(contactId1)); + assertFalse(c.isConnected(contactId1, transportId1)); + assertFalse(c.isConnected(contactId1, transportId2)); + assertFalse(c.isConnected(contactId1, transportId3)); // Check that a registered connection shows up - this should // broadcast a ConnectionOpenedEvent and a ContactConnectedEvent @@ -52,34 +85,47 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase { oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); }}); - c.registerConnection(contactId, transportId, true); - assertEquals(singletonList(contactId), - c.getConnectedContacts(transportId)); - assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + c.registerIncomingConnection(contactId1, transportId1, conn1); context.assertIsSatisfied(); - // Register an identical connection - this should broadcast a - // ConnectionOpenedEvent and lookup should be unaffected + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + assertTrue(c.isConnected(contactId1)); + assertTrue(c.isConnected(contactId1, transportId1)); + + // Register another connection with the same contact and transport - + // this should broadcast a ConnectionOpenedEvent and lookup should be + // unaffected context.checking(new Expectations() {{ oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); }}); - c.registerConnection(contactId, transportId, true); - assertEquals(singletonList(contactId), - c.getConnectedContacts(transportId)); - assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + c.registerIncomingConnection(contactId1, transportId1, conn2); context.assertIsSatisfied(); + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + assertTrue(c.isConnected(contactId1)); + assertTrue(c.isConnected(contactId1, transportId1)); + // Unregister one of the connections - this should broadcast a // ConnectionClosedEvent and lookup should be unaffected context.checking(new Expectations() {{ oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class))); }}); - c.unregisterConnection(contactId, transportId, true); - assertEquals(singletonList(contactId), - c.getConnectedContacts(transportId)); - assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + c.unregisterConnection(contactId1, transportId1, conn1, true, false); context.assertIsSatisfied(); + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + assertTrue(c.isConnected(contactId1)); + assertTrue(c.isConnected(contactId1, transportId1)); + // Unregister the other connection - this should broadcast a // ConnectionClosedEvent and a ContactDisconnectedEvent context.checking(new Expectations() {{ @@ -87,42 +133,458 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase { oneOf(eventBus).broadcast(with(any( ContactDisconnectedEvent.class))); }}); - c.unregisterConnection(contactId, transportId, true); - assertEquals(emptyList(), c.getConnectedContacts(transportId)); - assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + c.unregisterConnection(contactId1, transportId1, conn2, true, false); context.assertIsSatisfied(); + assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId1)); + assertFalse(c.isConnected(contactId1)); + assertFalse(c.isConnected(contactId1, transportId1)); + // Try to unregister the connection again - exception should be thrown try { - c.unregisterConnection(contactId, transportId, true); + c.unregisterConnection(contactId1, transportId1, conn2, + true, false); fail(); } catch (IllegalArgumentException expected) { // Expected } + } - // Register both contacts with one transport, one contact with both - - // this should broadcast three ConnectionOpenedEvents and two - // ContactConnectedEvents + @Test + public void testRegisterMultipleContacts() { + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue(emptyMap())); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Register two contacts with one transport, then one of the contacts + // with a second transport - this should broadcast three + // ConnectionOpenedEvents and two ContactConnectedEvents context.checking(new Expectations() {{ exactly(3).of(eventBus).broadcast(with(any( ConnectionOpenedEvent.class))); exactly(2).of(eventBus).broadcast(with(any( ContactConnectedEvent.class))); }}); - c.registerConnection(contactId, transportId, true); - c.registerConnection(contactId1, transportId, true); - c.registerConnection(contactId1, transportId1, true); - Collection connected = c.getConnectedContacts(transportId); + c.registerIncomingConnection(contactId1, transportId1, conn1); + c.registerIncomingConnection(contactId2, transportId1, conn2); + c.registerIncomingConnection(contactId2, transportId2, conn3); + context.assertIsSatisfied(); + + assertTrue(c.isConnected(contactId1)); + assertTrue(c.isConnected(contactId2)); + + assertTrue(c.isConnected(contactId1, transportId1)); + assertFalse(c.isConnected(contactId1, transportId2)); + + assertTrue(c.isConnected(contactId2, transportId1)); + assertTrue(c.isConnected(contactId2, transportId2)); + + Collection connected = c.getConnectedContacts(transportId1); assertEquals(2, connected.size()); - assertTrue(connected.contains(contactId)); assertTrue(connected.contains(contactId1)); + assertTrue(connected.contains(contactId2)); + + connected = c.getConnectedOrBetterContacts(transportId1); + assertEquals(2, connected.size()); + assertTrue(connected.contains(contactId1)); + assertTrue(connected.contains(contactId2)); + + assertEquals(singletonList(contactId2), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId2), + c.getConnectedOrBetterContacts(transportId2)); + } + + @Test + public void testConnectionsAreNotInterruptedUnlessPriorityIsSet() { + // Prefer transport 2 to transport 1 + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue( + singletonMap(transportId1, singletonList(transportId2)))); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Connect via transport 1 (worse than 2) with no priority set + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); + }}); + c.registerIncomingConnection(contactId1, transportId1, conn1); + context.assertIsSatisfied(); + assertEquals(singletonList(contactId1), c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(emptyList(), c.getConnectedContacts(transportId2)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 2 (better than 1) and set priority to high - + // the old connection should not be interrupted, despite using a worse + // transport, to remain compatible with old peers + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId2, conn2, high); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 3 (no preference) and set priority to high - + // again, no interruptions are expected + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId3, conn3, high); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId3)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId3)); + } + + @Test + public void testNewConnectionIsInterruptedIfOldConnectionUsesBetterTransport() { + // Prefer transport 1 to transport 2 + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue( + singletonMap(transportId2, singletonList(transportId1)))); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Connect via transport 1 (better than 2) and set priority to low + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId1, conn1, low); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // The contact is not connected via transport 2 but is connected via a + // better transport + assertEquals(emptyList(), c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 2 (worse than 1) and set priority to high - + // the new connection should be interrupted because it uses a worse + // transport + context.checking(new Expectations() {{ + oneOf(conn2).interruptOutgoingSession(); + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId2, conn2, high); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 3 (no preference) and set priority to low - + // no further interruptions + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId3, conn3, low); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId3)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId3)); + + // Unregister the interrupted connection (transport 2) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class))); + }}); + c.unregisterConnection(contactId1, transportId2, conn2, true, false); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // The contact is not connected via transport 2 but is connected via a + // better transport + assertEquals(emptyList(), c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId3)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId3)); + } + + @Test + public void testOldConnectionIsInterruptedIfNewConnectionUsesBetterTransport() { + // Prefer transport 2 to transport 1 + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue( + singletonMap(transportId1, singletonList(transportId2)))); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Connect via transport 1 (worse than 2) and set priority to high + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId1, conn1, high); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(emptyList(), c.getConnectedContacts(transportId2)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 2 (better than 1) and set priority to low - + // the old connection should be interrupted because it uses a worse + // transport + context.checking(new Expectations() {{ + oneOf(conn1).interruptOutgoingSession(); + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId2, conn2, low); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 3 (no preference) and set priority to high - + // no further interruptions + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId3, conn3, high); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId3)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId3)); + + // Unregister the interrupted connection (transport 1) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class))); + }}); + c.unregisterConnection(contactId1, transportId1, conn1, true, false); + context.assertIsSatisfied(); + + // The contact is not connected via transport 1 but is connected via a + // better transport + assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId3)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId3)); + } + + @Test + public void testNewConnectionIsInterruptedIfOldConnectionHasHigherPriority() { + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue(emptyMap())); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Register a connection with high priority + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId1, conn1, high); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Register another connection via the same transport (no priority yet) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerIncomingConnection(contactId1, transportId1, conn2); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Set the priority of the second connection to low - the second + // connection should be interrupted + context.checking(new Expectations() {{ + oneOf(conn2).interruptOutgoingSession(); + }}); + c.setPriority(contactId1, transportId1, conn2, low); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Register a third connection with low priority - it should also be + // interrupted + context.checking(new Expectations() {{ + oneOf(conn3).interruptOutgoingSession(); + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId1, conn3, low); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + } + + @Test + public void testOldConnectionIsInterruptedIfNewConnectionHasHigherPriority() { + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue(emptyMap())); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Register a connection with low priority + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); + }}); + c.registerOutgoingConnection(contactId1, transportId1, conn1, low); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Register another connection via the same transport (no priority yet) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerIncomingConnection(contactId1, transportId1, conn2); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Set the priority of the second connection to high - the first + // connection should be interrupted + context.checking(new Expectations() {{ + oneOf(conn1).interruptOutgoingSession(); + }}); + c.setPriority(contactId1, transportId1, conn2, high); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); } @Test public void testRegisterAndUnregisterPendingContacts() { - ConnectionRegistry c = new ConnectionRegistryImpl(eventBus); + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue(emptyMap())); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); context.checking(new Expectations() {{ oneOf(eventBus).broadcast(with(any( diff --git a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java index 522c9aad9..cf29a7a0e 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java @@ -157,7 +157,21 @@ public class PollerImplTest extends BrambleMockTestCase { } @Test - public void testRescheduleAndReconnectOnConnectionClosed() + public void testRescheduleOnOutgoingConnectionClosed() { + DuplexPlugin plugin = context.mock(DuplexPlugin.class); + + context.checking(new Expectations() {{ + allowing(plugin).getId(); + will(returnValue(transportId)); + }}); + expectReschedule(plugin); + + poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId, + false, false)); + } + + @Test + public void testRescheduleAndReconnectOnOutgoingConnectionFailed() throws Exception { DuplexPlugin plugin = context.mock(DuplexPlugin.class); DuplexTransportConnection duplexConnection = @@ -166,45 +180,40 @@ public class PollerImplTest extends BrambleMockTestCase { context.checking(new Expectations() {{ allowing(plugin).getId(); will(returnValue(transportId)); - // reschedule() - // Get the plugin - oneOf(pluginManager).getPlugin(transportId); - will(returnValue(plugin)); - // The plugin supports polling - oneOf(plugin).shouldPoll(); - will(returnValue(true)); - // Get the plugin - oneOf(pluginManager).getPlugin(transportId); - will(returnValue(plugin)); - // The plugin supports polling - oneOf(plugin).shouldPoll(); - will(returnValue(true)); - // Schedule the next poll - oneOf(plugin).getPollingInterval(); - will(returnValue(pollingInterval)); - oneOf(clock).currentTimeMillis(); - will(returnValue(now)); - oneOf(scheduler).schedule(with(any(Runnable.class)), - with((long) pollingInterval), with(MILLISECONDS)); - will(returnValue(future)); - // connectToContact() - // Check whether the contact is already connected - oneOf(connectionRegistry).isConnected(contactId, transportId); - will(returnValue(false)); - // Get the transport properties - oneOf(transportPropertyManager).getRemoteProperties(contactId, - transportId); - will(returnValue(properties)); - // Connect to the contact - oneOf(plugin).createConnection(properties); - will(returnValue(duplexConnection)); - // Pass the connection to the connection manager - oneOf(connectionManager).manageOutgoingConnection(contactId, - transportId, duplexConnection); }}); + expectReschedule(plugin); + expectReconnect(plugin, duplexConnection); poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId, - false)); + false, true)); + } + + @Test + public void testRescheduleOnIncomingConnectionClosed() { + DuplexPlugin plugin = context.mock(DuplexPlugin.class); + + context.checking(new Expectations() {{ + allowing(plugin).getId(); + will(returnValue(transportId)); + }}); + expectReschedule(plugin); + + poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId, + true, false)); + } + + @Test + public void testRescheduleOnIncomingConnectionFailed() { + DuplexPlugin plugin = context.mock(DuplexPlugin.class); + + context.checking(new Expectations() {{ + allowing(plugin).getId(); + will(returnValue(transportId)); + }}); + expectReschedule(plugin); + + poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId, + true, false)); } @Test @@ -354,7 +363,7 @@ public class PollerImplTest extends BrambleMockTestCase { // Get the transport properties and connected contacts oneOf(transportPropertyManager).getRemoteProperties(transportId); will(returnValue(singletonMap(contactId, properties))); - oneOf(connectionRegistry).getConnectedContacts(transportId); + oneOf(connectionRegistry).getConnectedOrBetterContacts(transportId); will(returnValue(emptyList())); // Poll the plugin oneOf(plugin).poll(with(collectionOf( @@ -397,7 +406,7 @@ public class PollerImplTest extends BrambleMockTestCase { // Get the transport properties and connected contacts oneOf(transportPropertyManager).getRemoteProperties(transportId); will(returnValue(singletonMap(contactId, properties))); - oneOf(connectionRegistry).getConnectedContacts(transportId); + oneOf(connectionRegistry).getConnectedOrBetterContacts(transportId); will(returnValue(singletonList(contactId))); // All contacts are connected, so don't poll the plugin }}); @@ -431,4 +440,48 @@ public class PollerImplTest extends BrambleMockTestCase { poller.eventOccurred(new TransportEnabledEvent(transportId)); poller.eventOccurred(new TransportDisabledEvent(transportId)); } + + private void expectReschedule(Plugin plugin) { + context.checking(new Expectations() {{ + // Get the plugin + oneOf(pluginManager).getPlugin(transportId); + will(returnValue(plugin)); + // The plugin supports polling + oneOf(plugin).shouldPoll(); + will(returnValue(true)); + // Schedule the next poll + oneOf(plugin).getPollingInterval(); + will(returnValue(pollingInterval)); + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); + oneOf(scheduler).schedule(with(any(Runnable.class)), + with((long) pollingInterval), with(MILLISECONDS)); + will(returnValue(future)); + }}); + } + + private void expectReconnect(DuplexPlugin plugin, + DuplexTransportConnection duplexConnection) throws Exception { + context.checking(new Expectations() {{ + // Get the plugin + oneOf(pluginManager).getPlugin(transportId); + will(returnValue(plugin)); + // The plugin supports polling + oneOf(plugin).shouldPoll(); + will(returnValue(true)); + // Check whether the contact is already connected + oneOf(connectionRegistry).isConnected(contactId, transportId); + will(returnValue(false)); + // Get the transport properties + oneOf(transportPropertyManager).getRemoteProperties(contactId, + transportId); + will(returnValue(properties)); + // Connect to the contact + oneOf(plugin).createConnection(properties); + will(returnValue(duplexConnection)); + // Pass the connection to the connection manager + oneOf(connectionManager).manageOutgoingConnection(contactId, + transportId, duplexConnection); + }}); + } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncRecordReaderImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncRecordReaderImplTest.java index 667c2b557..740451837 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncRecordReaderImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncRecordReaderImplTest.java @@ -8,6 +8,7 @@ import org.briarproject.bramble.api.record.RecordReader; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.MessageFactory; import org.briarproject.bramble.api.sync.Offer; +import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.SyncRecordReader; import org.briarproject.bramble.api.sync.Versions; @@ -24,11 +25,14 @@ import javax.annotation.Nullable; import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES; import static org.briarproject.bramble.api.sync.RecordTypes.ACK; import static org.briarproject.bramble.api.sync.RecordTypes.OFFER; +import static org.briarproject.bramble.api.sync.RecordTypes.PRIORITY; import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST; import static org.briarproject.bramble.api.sync.RecordTypes.VERSIONS; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_SUPPORTED_VERSIONS; +import static org.briarproject.bramble.api.sync.SyncConstants.PRIORITY_NONCE_BYTES; import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION; +import static org.briarproject.bramble.test.TestUtils.getRandomBytes; import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -119,6 +123,31 @@ public class SyncRecordReaderImplTest extends BrambleMockTestCase { reader.readVersions(); } + @Test(expected = FormatException.class) + public void testFormatExceptionIfPriorityNonceIsTooSmall() + throws Exception { + expectReadRecord(createPriority(PRIORITY_NONCE_BYTES - 1)); + + reader.readPriority(); + } + + @Test(expected = FormatException.class) + public void testFormatExceptionIfPriorityNonceIsTooLarge() + throws Exception { + expectReadRecord(createPriority(PRIORITY_NONCE_BYTES + 1)); + + reader.readPriority(); + } + + @Test + public void testNoFormatExceptionIfPriorityNonceIsCorrectSize() + throws Exception { + expectReadRecord(createPriority(PRIORITY_NONCE_BYTES)); + + Priority priority = reader.readPriority(); + assertEquals(PRIORITY_NONCE_BYTES, priority.getNonce().length); + } + @Test public void testEofReturnsTrueWhenAtEndOfStream() throws Exception { expectReadRecord(createAck()); @@ -173,6 +202,11 @@ public class SyncRecordReaderImplTest extends BrambleMockTestCase { return new Record(PROTOCOL_VERSION, VERSIONS, payload); } + private Record createPriority(int nonceBytes) { + byte[] payload = getRandomBytes(nonceBytes); + return new Record(PROTOCOL_VERSION, PRIORITY, payload); + } + private byte[] createPayload() throws Exception { ByteArrayOutputStream payload = new ByteArrayOutputStream(); while (payload.size() + UniqueId.LENGTH <= MAX_RECORD_PAYLOAD_BYTES) { diff --git a/bramble-core/src/test/java/org/briarproject/bramble/test/TestPluginConfigModule.java b/bramble-core/src/test/java/org/briarproject/bramble/test/TestPluginConfigModule.java index 689d4c5bd..4b3347884 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/test/TestPluginConfigModule.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/test/TestPluginConfigModule.java @@ -10,12 +10,15 @@ import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory; import java.util.Collection; +import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import dagger.Module; import dagger.Provides; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static org.briarproject.bramble.test.TestUtils.getTransportId; @@ -85,6 +88,12 @@ public class TestPluginConfigModule { public boolean shouldPoll() { return false; } + + @Override + public Map> getTransportPreferences() { + return emptyMap(); + } + }; return pluginConfig; } diff --git a/bramble-java/src/main/java/org/briarproject/bramble/plugin/DesktopPluginModule.java b/bramble-java/src/main/java/org/briarproject/bramble/plugin/DesktopPluginModule.java index c00bd99d7..53578ccc4 100644 --- a/bramble-java/src/main/java/org/briarproject/bramble/plugin/DesktopPluginModule.java +++ b/bramble-java/src/main/java/org/briarproject/bramble/plugin/DesktopPluginModule.java @@ -6,7 +6,10 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.ShutdownManager; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.BackoffFactory; +import org.briarproject.bramble.api.plugin.BluetoothConstants; +import org.briarproject.bramble.api.plugin.LanTcpConstants; import org.briarproject.bramble.api.plugin.PluginConfig; +import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory; import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory; import org.briarproject.bramble.api.reliability.ReliabilityLayerFactory; @@ -17,6 +20,8 @@ import org.briarproject.bramble.plugin.tcp.WanTcpPluginFactory; import java.security.SecureRandom; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; import dagger.Module; @@ -24,6 +29,8 @@ import dagger.Provides; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; @Module public class DesktopPluginModule extends PluginModule { @@ -61,6 +68,13 @@ public class DesktopPluginModule extends PluginModule { public boolean shouldPoll() { return true; } + + @Override + public Map> getTransportPreferences() { + // Prefer LAN to Bluetooth + return singletonMap(BluetoothConstants.ID, + singletonList(LanTcpConstants.ID)); + } }; return pluginConfig; } diff --git a/briar-android/src/androidTestScreenshot/java/org/briarproject/briar/android/SetupDataTest.java b/briar-android/src/androidTestScreenshot/java/org/briarproject/briar/android/SetupDataTest.java index ee47bf86f..140e54231 100644 --- a/briar-android/src/androidTestScreenshot/java/org/briarproject/briar/android/SetupDataTest.java +++ b/briar-android/src/androidTestScreenshot/java/org/briarproject/briar/android/SetupDataTest.java @@ -120,7 +120,8 @@ public class SetupDataTest extends ScreenshotTest { // TODO add messages - connectionRegistry.registerConnection(bob.getId(), ID, true); + connectionRegistry.registerIncomingConnection(bob.getId(), ID, () -> { + }); } } diff --git a/briar-android/src/main/java/org/briarproject/briar/android/AppModule.java b/briar-android/src/main/java/org/briarproject/briar/android/AppModule.java index b9e074897..27a720d7f 100644 --- a/briar-android/src/main/java/org/briarproject/briar/android/AppModule.java +++ b/briar-android/src/main/java/org/briarproject/briar/android/AppModule.java @@ -20,7 +20,10 @@ import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.network.NetworkManager; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.BackoffFactory; +import org.briarproject.bramble.api.plugin.BluetoothConstants; +import org.briarproject.bramble.api.plugin.LanTcpConstants; import org.briarproject.bramble.api.plugin.PluginConfig; +import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory; import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory; import org.briarproject.bramble.api.reporting.DevConfig; @@ -48,6 +51,8 @@ import java.io.File; import java.security.GeneralSecurityException; import java.security.SecureRandom; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -62,6 +67,8 @@ import static android.content.Context.MODE_PRIVATE; import static android.os.Build.VERSION.SDK_INT; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.briarproject.bramble.api.reporting.ReportingConstants.DEV_ONION_ADDRESS; import static org.briarproject.bramble.api.reporting.ReportingConstants.DEV_PUBLIC_KEY_HEX; import static org.briarproject.briar.android.TestingConstants.IS_DEBUG_BUILD; @@ -153,6 +160,13 @@ public class AppModule { public boolean shouldPoll() { return true; } + + @Override + public Map> getTransportPreferences() { + // Prefer LAN to Bluetooth + return singletonMap(BluetoothConstants.ID, + singletonList(LanTcpConstants.ID)); + } }; return pluginConfig; } diff --git a/briar-headless/src/main/java/org/briarproject/briar/headless/HeadlessModule.kt b/briar-headless/src/main/java/org/briarproject/briar/headless/HeadlessModule.kt index 5f622c83b..c74b36225 100644 --- a/briar-headless/src/main/java/org/briarproject/briar/headless/HeadlessModule.kt +++ b/briar-headless/src/main/java/org/briarproject/briar/headless/HeadlessModule.kt @@ -11,6 +11,7 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor import org.briarproject.bramble.api.network.NetworkManager import org.briarproject.bramble.api.plugin.BackoffFactory import org.briarproject.bramble.api.plugin.PluginConfig +import org.briarproject.bramble.api.plugin.TransportId import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory import org.briarproject.bramble.api.system.Clock @@ -88,6 +89,7 @@ internal class HeadlessModule(private val appDir: File) { override fun getDuplexFactories(): Collection = duplex override fun getSimplexFactories(): Collection = emptyList() override fun shouldPoll(): Boolean = true + override fun getTransportPreferences(): Map> = emptyMap() } } diff --git a/briar-headless/src/test/java/org/briarproject/briar/headless/HeadlessTestModule.kt b/briar-headless/src/test/java/org/briarproject/briar/headless/HeadlessTestModule.kt index e0e70fac2..8482441e5 100644 --- a/briar-headless/src/test/java/org/briarproject/briar/headless/HeadlessTestModule.kt +++ b/briar-headless/src/test/java/org/briarproject/briar/headless/HeadlessTestModule.kt @@ -6,6 +6,7 @@ import dagger.Provides import org.briarproject.bramble.api.FeatureFlags import org.briarproject.bramble.api.db.DatabaseConfig import org.briarproject.bramble.api.plugin.PluginConfig +import org.briarproject.bramble.api.plugin.TransportId import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory import org.briarproject.bramble.network.JavaNetworkModule @@ -55,6 +56,7 @@ internal class HeadlessTestModule(private val appDir: File) { override fun getDuplexFactories(): Collection = emptyList() override fun getSimplexFactories(): Collection = emptyList() override fun shouldPoll(): Boolean = false + override fun getTransportPreferences(): Map> = emptyMap() } }