diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionRegistry.java b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionRegistry.java index 6afc2ee7e..1b552101e 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionRegistry.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionRegistry.java @@ -3,6 +3,7 @@ package org.briarproject.bramble.api.connection; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.PendingContactId; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.PluginConfig; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent; import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent; @@ -10,6 +11,7 @@ import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent; import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent; +import org.briarproject.bramble.api.sync.Priority; import java.util.Collection; @@ -21,19 +23,46 @@ public interface ConnectionRegistry { /** * Registers a connection with the given contact over the given transport. + *

+ * If the registry has any connections with the same contact and a + * {@link PluginConfig#getTransportPreferences() worse} transport, those + * connections will be + * {@link InterruptibleConnection#interruptOutgoingSession() interrupted}. + *

+ * If the registry has any connections with the same contact and a better + * transport, the given connection will be interrupted. + *

* Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts * {@link ContactConnectedEvent} if this is the only connection with the * contact. */ - void registerConnection(ContactId c, TransportId t, boolean incoming); + void registerConnection(ContactId c, TransportId t, + InterruptibleConnection conn, boolean incoming); /** * Unregisters a connection with the given contact over the given transport. + *

* Broadcasts {@link ConnectionClosedEvent}. Also broadcasts * {@link ContactDisconnectedEvent} if this is the only connection with * the contact. */ - void unregisterConnection(ContactId c, TransportId t, boolean incoming); + void unregisterConnection(ContactId c, TransportId t, + InterruptibleConnection conn, boolean incoming); + + /** + * Sets the {@link Priority priority} of a connection that was previously + * registered via {@link #registerConnection(ContactId, TransportId, + * InterruptibleConnection, boolean)}. + *

+ * If the registry has any connections with the same contact and transport + * and a lower {@link Priority priority}, those connections will be + * {@link InterruptibleConnection#interruptOutgoingSession() interrupted}. + *

+ * If the registry has any connections with the same contact and transport + * and a higher priority, the given connection will be interrupted. + */ + void setPriority(ContactId c, TransportId t, InterruptibleConnection conn, + Priority priority); /** * Returns any contacts that are connected via the given transport. @@ -41,10 +70,10 @@ public interface ConnectionRegistry { Collection getConnectedContacts(TransportId t); /** - * Returns any contacts that are connected via the given transport, or via - * any transport that's preferred to the given transport. + * Returns any contacts that are connected via the given transport or any + * {@link PluginConfig#getTransportPreferences() better} transport. */ - Collection getConnectedOrPreferredContacts(TransportId t); + Collection getConnectedOrBetterContacts(TransportId t); /** * Returns true if the given contact is connected via the given transport. diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/InterruptibleConnection.java b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/InterruptibleConnection.java similarity index 83% rename from bramble-core/src/main/java/org/briarproject/bramble/connection/InterruptibleConnection.java rename to bramble-api/src/main/java/org/briarproject/bramble/api/connection/InterruptibleConnection.java index e9924964d..59b46b1ec 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/InterruptibleConnection.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/InterruptibleConnection.java @@ -1,4 +1,4 @@ -package org.briarproject.bramble.connection; +package org.briarproject.bramble.api.connection; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; @@ -7,7 +7,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; * sync session. */ @NotNullByDefault -interface InterruptibleConnection { +public interface InterruptibleConnection { /** * Interrupts the connection's outgoing sync session. If the underlying diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooser.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooser.java deleted file mode 100644 index 300339878..000000000 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooser.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.briarproject.bramble.connection; - -import org.briarproject.bramble.api.contact.ContactId; -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.plugin.TransportId; -import org.briarproject.bramble.api.sync.Priority; - -/** - * Chooses one connection per contact and transport to keep open and closes - * any other connections. - */ -@NotNullByDefault -interface ConnectionChooser { - - /** - * Adds the given connection to the chooser with the given priority. - *

- * If the chooser has a connection with the same contact and transport and - * a lower {@link Priority priority}, that connection will be - * {@link InterruptibleConnection#interruptOutgoingSession() interrupted}. - * If the chooser has a connection with the same contact and transport and - * a higher priority, the newly added connection will be interrupted. - */ - void addConnection(ContactId c, TransportId t, InterruptibleConnection conn, - Priority p); - - /** - * Removes the given connection from the chooser. - */ - void removeConnection(ContactId c, TransportId t, - InterruptibleConnection conn); -} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooserImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooserImpl.java deleted file mode 100644 index b73f74fed..000000000 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionChooserImpl.java +++ /dev/null @@ -1,100 +0,0 @@ -package org.briarproject.bramble.connection; - -import org.briarproject.bramble.api.contact.ContactId; -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.plugin.TransportId; -import org.briarproject.bramble.api.sync.Priority; - -import java.util.HashMap; -import java.util.Map; -import java.util.logging.Logger; - -import javax.annotation.concurrent.GuardedBy; -import javax.inject.Inject; - -import static java.util.logging.Logger.getLogger; -import static org.briarproject.bramble.api.Bytes.compare; - -@NotNullByDefault -class ConnectionChooserImpl implements ConnectionChooser { - - private static final Logger LOG = - getLogger(ConnectionChooserImpl.class.getName()); - - private final Object lock = new Object(); - @GuardedBy("lock") - private final Map bestConnections = new HashMap<>(); - - @Inject - ConnectionChooserImpl() { - } - - @Override - public void addConnection(ContactId c, TransportId t, - InterruptibleConnection conn, Priority p) { - InterruptibleConnection close = null; - synchronized (lock) { - Key k = new Key(c, t); - Value best = bestConnections.get(k); - if (best == null) { - bestConnections.put(k, new Value(conn, p)); - } else if (compare(p.getNonce(), best.priority.getNonce()) > 0) { - LOG.info("Found a better connection"); - close = best.connection; - bestConnections.put(k, new Value(conn, p)); - } else { - LOG.info("Already have a better connection"); - close = conn; - } - } - if (close != null) close.interruptOutgoingSession(); - } - - @Override - public void removeConnection(ContactId c, TransportId t, - InterruptibleConnection conn) { - synchronized (lock) { - Key k = new Key(c, t); - Value best = bestConnections.get(k); - if (best.connection == conn) bestConnections.remove(k); - } - } - - private static class Key { - - private final ContactId contactId; - private final TransportId transportId; - - private Key(ContactId contactId, TransportId transportId) { - this.contactId = contactId; - this.transportId = transportId; - } - - @Override - public int hashCode() { - return contactId.hashCode(); - } - - @Override - public boolean equals(Object o) { - if (o instanceof Key) { - Key k = (Key) o; - return contactId.equals(k.contactId) && - transportId.equals(k.transportId); - } else { - return false; - } - } - } - - private static class Value { - - private final InterruptibleConnection connection; - private final Priority priority; - - private Value(InterruptibleConnection connection, Priority priority) { - this.connection = connection; - this.priority = priority; - } - } -} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java index 71914cb3c..2a50033b1 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java @@ -37,7 +37,6 @@ class ConnectionManagerImpl implements ConnectionManager { private final ContactExchangeManager contactExchangeManager; private final ConnectionRegistry connectionRegistry; private final TransportPropertyManager transportPropertyManager; - private final ConnectionChooser connectionChooser; private final SecureRandom secureRandom; @Inject @@ -49,7 +48,7 @@ class ConnectionManagerImpl implements ConnectionManager { ContactExchangeManager contactExchangeManager, ConnectionRegistry connectionRegistry, TransportPropertyManager transportPropertyManager, - ConnectionChooser connectionChooser, SecureRandom secureRandom) { + SecureRandom secureRandom) { this.ioExecutor = ioExecutor; this.keyManager = keyManager; this.streamReaderFactory = streamReaderFactory; @@ -59,7 +58,6 @@ class ConnectionManagerImpl implements ConnectionManager { this.contactExchangeManager = contactExchangeManager; this.connectionRegistry = connectionRegistry; this.transportPropertyManager = transportPropertyManager; - this.connectionChooser = connectionChooser; this.secureRandom = secureRandom; } @@ -78,7 +76,7 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new IncomingDuplexSyncConnection(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager, ioExecutor, - connectionChooser, t, d)); + t, d)); } @Override @@ -103,7 +101,7 @@ class ConnectionManagerImpl implements ConnectionManager { ioExecutor.execute(new OutgoingDuplexSyncConnection(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager, ioExecutor, - connectionChooser, secureRandom, c, t, d)); + secureRandom, c, t, d)); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionModule.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionModule.java index 08f299352..e6b8fee3a 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionModule.java @@ -23,11 +23,4 @@ public class ConnectionModule { ConnectionRegistryImpl connectionRegistry) { return connectionRegistry; } - - @Provides - @Singleton - ConnectionChooser provideConnectionChooser( - ConnectionChooserImpl connectionChooser) { - return connectionChooser; - } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionRegistryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionRegistryImpl.java index 9aa92d646..b7337c7c0 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionRegistryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionRegistryImpl.java @@ -1,7 +1,9 @@ package org.briarproject.bramble.connection; +import org.briarproject.bramble.api.Bytes; import org.briarproject.bramble.api.Pair; import org.briarproject.bramble.api.connection.ConnectionRegistry; +import org.briarproject.bramble.api.connection.InterruptibleConnection; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.PendingContactId; import org.briarproject.bramble.api.event.EventBus; @@ -14,6 +16,7 @@ import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent; import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent; +import org.briarproject.bramble.api.sync.Priority; import java.util.ArrayList; import java.util.Collection; @@ -25,6 +28,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.logging.Logger; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; @@ -42,7 +46,6 @@ class ConnectionRegistryImpl implements ConnectionRegistry { private final EventBus eventBus; private final Map> betterTransports; - private final Map> worseTransports; private final Object lock = new Object(); @GuardedBy("lock") @@ -54,7 +57,6 @@ class ConnectionRegistryImpl implements ConnectionRegistry { ConnectionRegistryImpl(EventBus eventBus, PluginConfig pluginConfig) { this.eventBus = eventBus; betterTransports = new HashMap<>(); - worseTransports = new HashMap<>(); initTransportPreferences(pluginConfig.getTransportPreferences()); contactConnections = new HashMap<>(); connectedPendingContacts = new HashSet<>(); @@ -71,31 +73,50 @@ class ConnectionRegistryImpl implements ConnectionRegistry { betterTransports.put(worse, betterList); } betterList.add(better); - List worseList = worseTransports.get(better); - if (worseList == null) { - worseList = new ArrayList<>(); - worseTransports.put(better, worseList); - } - worseList.add(worse); } } @Override public void registerConnection(ContactId c, TransportId t, - boolean incoming) { + InterruptibleConnection conn, boolean incoming) { if (LOG.isLoggable(INFO)) { if (incoming) LOG.info("Incoming connection registered: " + t); else LOG.info("Outgoing connection registered: " + t); } - boolean firstConnection = false; + List toInterrupt; + boolean firstConnection = false, interruptNewConnection = false; synchronized (lock) { List recs = contactConnections.get(c); if (recs == null) { recs = new ArrayList<>(); contactConnections.put(c, recs); } - if (recs.isEmpty()) firstConnection = true; - recs.add(new ConnectionRecord(t)); + if (recs.isEmpty()) { + toInterrupt = emptyList(); + firstConnection = true; + } else { + toInterrupt = new ArrayList<>(recs.size()); + for (ConnectionRecord rec : recs) { + int compare = compare(t, rec.transportId); + if (compare == -1) { + // The old connection is better than the new one + interruptNewConnection = true; + } else if (compare == 1 && !rec.interrupted) { + // The new connection is better than the old one + toInterrupt.add(rec.conn); + rec.interrupted = true; + } + } + } + recs.add(new ConnectionRecord(t, conn)); + } + if (interruptNewConnection) { + LOG.info("Interrupting new connection"); + conn.interruptOutgoingSession(); + } + for (InterruptibleConnection old : toInterrupt) { + LOG.info("Interrupting old connection"); + old.interruptOutgoingSession(); } eventBus.broadcast(new ConnectionOpenedEvent(c, t, incoming)); if (firstConnection) { @@ -104,9 +125,61 @@ class ConnectionRegistryImpl implements ConnectionRegistry { } } + private int compare(TransportId a, TransportId b) { + if (getBetterTransports(a).contains(b)) return -1; + else if (getBetterTransports(b).contains(a)) return 1; + else return 0; + } + + private List getBetterTransports(TransportId t) { + List better = betterTransports.get(t); + return better == null ? emptyList() : better; + } + + @Override + public void setPriority(ContactId c, TransportId t, + InterruptibleConnection conn, Priority priority) { + if (LOG.isLoggable(INFO)) LOG.info("Setting connection priority: " + t); + List toInterrupt; + boolean interruptNewConnection = false; + synchronized (lock) { + List recs = contactConnections.get(c); + if (recs == null) throw new IllegalArgumentException(); + toInterrupt = new ArrayList<>(recs.size()); + for (ConnectionRecord rec : recs) { + if (rec.conn == conn) { + // Store the priority of this connection + rec.priority = priority; + } else if (rec.transportId.equals(t)) { + int compare = compare(priority, rec.priority); + if (compare == -1) { + // The old connection is better than the new one + interruptNewConnection = true; + } else if (compare == 1 && !rec.interrupted) { + // The new connection is better than the old one + toInterrupt.add(rec.conn); + rec.interrupted = true; + } + } + } + } + if (interruptNewConnection) { + LOG.info("Interrupting new connection"); + conn.interruptOutgoingSession(); + } + for (InterruptibleConnection old : toInterrupt) { + LOG.info("Interrupting old connection"); + old.interruptOutgoingSession(); + } + } + + private int compare(Priority a, @Nullable Priority b) { + return b == null ? 0 : Bytes.compare(a.getNonce(), b.getNonce()); + } + @Override public void unregisterConnection(ContactId c, TransportId t, - boolean incoming) { + InterruptibleConnection conn, boolean incoming) { if (LOG.isLoggable(INFO)) { if (incoming) LOG.info("Incoming connection unregistered: " + t); else LOG.info("Outgoing connection unregistered: " + t); @@ -114,7 +187,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry { boolean lastConnection = false; synchronized (lock) { List recs = contactConnections.get(c); - if (recs == null || !recs.remove(new ConnectionRecord(t))) + if (recs == null || !recs.remove(new ConnectionRecord(t, conn))) throw new IllegalArgumentException(); if (recs.isEmpty()) lastConnection = true; } @@ -146,8 +219,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry { } @Override - public Collection getConnectedOrPreferredContacts( - TransportId t) { + public Collection getConnectedOrBetterContacts(TransportId t) { synchronized (lock) { List better = betterTransports.get(t); if (better == null) better = emptyList(); @@ -164,7 +236,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry { } if (LOG.isLoggable(INFO)) { LOG.info(contactIds.size() - + " contacts connected or preferred: " + t); + + " contacts connected or better: " + t); } return contactIds; } @@ -208,26 +280,34 @@ class ConnectionRegistryImpl implements ConnectionRegistry { eventBus.broadcast(new RendezvousConnectionClosedEvent(p, success)); } - private static class ConnectionRecord { + private class ConnectionRecord { private final TransportId transportId; + private final InterruptibleConnection conn; + @GuardedBy("lock") + @Nullable + private Priority priority = null; + @GuardedBy("lock") + private boolean interrupted = false; - private ConnectionRecord(TransportId transportId) { + private ConnectionRecord(TransportId transportId, + InterruptibleConnection conn) { this.transportId = transportId; + this.conn = conn; } @Override public boolean equals(Object o) { if (o instanceof ConnectionRecord) { - ConnectionRecord rec = (ConnectionRecord) o; - return transportId.equals(rec.transportId); + return conn == ((ConnectionRecord) o).conn; + } else { + return false; } - return false; } @Override public int hashCode() { - return transportId.hashCode(); + return conn.hashCode(); } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java index 778e7b4d8..6053b10c9 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/DuplexSyncConnection.java @@ -1,6 +1,7 @@ package org.briarproject.bramble.connection; import org.briarproject.bramble.api.connection.ConnectionRegistry; +import org.briarproject.bramble.api.connection.InterruptibleConnection; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportConnectionReader; @@ -31,7 +32,6 @@ abstract class DuplexSyncConnection extends SyncConnection implements InterruptibleConnection { final Executor ioExecutor; - final ConnectionChooser connectionChooser; final TransportId transportId; final TransportConnectionReader reader; final TransportConnectionWriter writer; @@ -69,13 +69,12 @@ abstract class DuplexSyncConnection extends SyncConnection StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, TransportPropertyManager transportPropertyManager, - Executor ioExecutor, ConnectionChooser connectionChooser, - TransportId transportId, DuplexTransportConnection connection) { + Executor ioExecutor, TransportId transportId, + DuplexTransportConnection connection) { super(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager); this.ioExecutor = ioExecutor; - this.connectionChooser = connectionChooser; this.transportId = transportId; reader = connection.getReader(); writer = connection.getWriter(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java index 5b72532e2..f1b53e768 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingDuplexSyncConnection.java @@ -31,12 +31,11 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, TransportPropertyManager transportPropertyManager, - Executor ioExecutor, ConnectionChooser connectionChooser, - TransportId transportId, DuplexTransportConnection connection) { + Executor ioExecutor, TransportId transportId, + DuplexTransportConnection connection) { super(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, - transportPropertyManager, ioExecutor, connectionChooser, - transportId, connection); + transportPropertyManager, ioExecutor, transportId, connection); } @Override @@ -60,15 +59,16 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection onReadError(true); return; } - connectionRegistry.registerConnection(contactId, transportId, true); + connectionRegistry.registerConnection(contactId, transportId, + this, true); // Start the outgoing session on another thread ioExecutor.execute(() -> runOutgoingSession(contactId)); try { // Store any transport properties discovered from the connection transportPropertyManager.addRemotePropertiesFromConnection( contactId, transportId, remote); - // Add the connection to the chooser when we receive its priority - PriorityHandler handler = p -> connectionChooser.addConnection( + // Update the connection registry when we receive our priority + PriorityHandler handler = p -> connectionRegistry.setPriority( contactId, transportId, this, p); // Create and run the incoming session createIncomingSession(ctx, reader, handler).run(); @@ -79,8 +79,7 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection onReadError(true); } finally { connectionRegistry.unregisterConnection(contactId, transportId, - true); - connectionChooser.removeConnection(contactId, transportId, this); + this, true); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java index 2644d8cf4..b41fb33e0 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/IncomingSimplexSyncConnection.java @@ -59,7 +59,6 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable { onError(true); return; } - connectionRegistry.registerConnection(contactId, transportId, true); try { // We don't expect to receive a priority for this connection PriorityHandler handler = p -> @@ -70,9 +69,6 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable { } catch (IOException e) { logException(LOG, WARNING, e); onError(true); - } finally { - connectionRegistry.unregisterConnection(contactId, transportId, - true); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java index 0da1c4a18..a4d237071 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingDuplexSyncConnection.java @@ -37,13 +37,11 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection StreamWriterFactory streamWriterFactory, SyncSessionFactory syncSessionFactory, TransportPropertyManager transportPropertyManager, - Executor ioExecutor, ConnectionChooser connectionChooser, - SecureRandom secureRandom, ContactId contactId, + Executor ioExecutor, SecureRandom secureRandom, ContactId contactId, TransportId transportId, DuplexTransportConnection connection) { super(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, - transportPropertyManager, ioExecutor, connectionChooser, - transportId, connection); + transportPropertyManager, ioExecutor, transportId, connection); this.secureRandom = secureRandom; this.contactId = contactId; } @@ -106,8 +104,9 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection onReadError(); return; } - connectionRegistry.registerConnection(contactId, transportId, false); - connectionChooser.addConnection(contactId, transportId, this, priority); + connectionRegistry.registerConnection(contactId, transportId, + this, false); + connectionRegistry.setPriority(contactId, transportId, this, priority); try { // Store any transport properties discovered from the connection transportPropertyManager.addRemotePropertiesFromConnection( @@ -124,8 +123,7 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection onReadError(); } finally { connectionRegistry.unregisterConnection(contactId, transportId, - false); - connectionChooser.removeConnection(contactId, transportId, this); + this, false); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java index 7136c818f..d91d444c2 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java @@ -52,7 +52,6 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { onError(); return; } - connectionRegistry.registerConnection(contactId, transportId, false); try { // Create and run the outgoing session createSimplexOutgoingSession(ctx, writer).run(); @@ -60,9 +59,6 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { } catch (IOException e) { logException(LOG, WARNING, e); onError(); - } finally { - connectionRegistry.unregisterConnection(contactId, transportId, - false); } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionChooserImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionChooserImplTest.java deleted file mode 100644 index 82ddc02ab..000000000 --- a/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionChooserImplTest.java +++ /dev/null @@ -1,80 +0,0 @@ -package org.briarproject.bramble.connection; - -import org.briarproject.bramble.api.contact.ContactId; -import org.briarproject.bramble.api.plugin.TransportId; -import org.briarproject.bramble.api.sync.Priority; -import org.briarproject.bramble.test.BrambleMockTestCase; -import org.jmock.Expectations; -import org.junit.Before; -import org.junit.Test; - -import static org.briarproject.bramble.api.sync.SyncConstants.PRIORITY_NONCE_BYTES; -import static org.briarproject.bramble.test.TestUtils.getContactId; -import static org.briarproject.bramble.test.TestUtils.getRandomBytes; -import static org.briarproject.bramble.test.TestUtils.getTransportId; -import static org.briarproject.bramble.util.StringUtils.fromHexString; - -public class ConnectionChooserImplTest extends BrambleMockTestCase { - - private final InterruptibleConnection conn1 = - context.mock(InterruptibleConnection.class, "conn1"); - private final InterruptibleConnection conn2 = - context.mock(InterruptibleConnection.class, "conn2"); - - private final ContactId contactId = getContactId(); - private final TransportId transportId = getTransportId(); - - private final Priority low = - new Priority(fromHexString("00000000000000000000000000000000")); - private final Priority high = - new Priority(fromHexString("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")); - - private ConnectionChooserImpl chooser; - - @Before - public void setUp() { - chooser = new ConnectionChooserImpl(); - } - - @Test - public void testOldConnectionIsInterruptedIfNewHasHigherPriority() { - chooser.addConnection(contactId, transportId, conn1, low); - - context.checking(new Expectations() {{ - oneOf(conn1).interruptOutgoingSession(); - }}); - - chooser.addConnection(contactId, transportId, conn2, high); - } - - @Test - public void testNewConnectionIsInterruptedIfOldHasHigherPriority() { - chooser.addConnection(contactId, transportId, conn1, high); - - context.checking(new Expectations() {{ - oneOf(conn2).interruptOutgoingSession(); - }}); - - chooser.addConnection(contactId, transportId, conn2, low); - } - - @Test - public void testConnectionIsNotInterruptedAfterBeingRemoved() { - chooser.addConnection(contactId, transportId, conn1, low); - chooser.removeConnection(contactId, transportId, conn1); - chooser.addConnection(contactId, transportId, conn2, high); - } - - @Test - public void testConnectionIsInterruptedIfAddedTwice() { - chooser.addConnection(contactId, transportId, conn1, - new Priority(getRandomBytes(PRIORITY_NONCE_BYTES))); - - context.checking(new Expectations() {{ - oneOf(conn1).interruptOutgoingSession(); - }}); - - chooser.addConnection(contactId, transportId, conn1, - new Priority(getRandomBytes(PRIORITY_NONCE_BYTES))); - } -} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionRegistryImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionRegistryImplTest.java index 3b1dcf208..5c361c3d6 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionRegistryImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/connection/ConnectionRegistryImplTest.java @@ -1,6 +1,8 @@ package org.briarproject.bramble.connection; +import org.briarproject.bramble.api.Pair; import org.briarproject.bramble.api.connection.ConnectionRegistry; +import org.briarproject.bramble.api.connection.InterruptibleConnection; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.PendingContactId; import org.briarproject.bramble.api.event.EventBus; @@ -12,6 +14,7 @@ import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent; import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent; import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent; +import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.test.BrambleMockTestCase; import org.jmock.Expectations; import org.junit.Test; @@ -23,6 +26,7 @@ import static java.util.Collections.singletonList; import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.briarproject.bramble.test.TestUtils.getTransportId; +import static org.briarproject.bramble.util.StringUtils.fromHexString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -32,16 +36,28 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase { private final EventBus eventBus = context.mock(EventBus.class); private final PluginConfig pluginConfig = context.mock(PluginConfig.class); + private final InterruptibleConnection conn1 = + context.mock(InterruptibleConnection.class, "conn1"); + private final InterruptibleConnection conn2 = + context.mock(InterruptibleConnection.class, "conn2"); + private final InterruptibleConnection conn3 = + context.mock(InterruptibleConnection.class, "conn3"); - private final ContactId contactId = getContactId(); private final ContactId contactId1 = getContactId(); - private final TransportId transportId = getTransportId(); + private final ContactId contactId2 = getContactId(); private final TransportId transportId1 = getTransportId(); + private final TransportId transportId2 = getTransportId(); + private final TransportId transportId3 = getTransportId(); private final PendingContactId pendingContactId = new PendingContactId(getRandomId()); + private final Priority low = + new Priority(fromHexString("00000000000000000000000000000000")); + private final Priority high = + new Priority(fromHexString("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")); + @Test - public void testRegisterAndUnregister() { + public void testRegisterMultipleConnections() { context.checking(new Expectations() {{ allowing(pluginConfig).getTransportPreferences(); will(returnValue(emptyList())); @@ -51,8 +67,12 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase { new ConnectionRegistryImpl(eventBus, pluginConfig); // The registry should be empty - assertEquals(emptyList(), c.getConnectedContacts(transportId)); assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId1)); + assertEquals(emptyList(), c.getConnectedContacts(transportId2)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId2)); + assertEquals(emptyList(), c.getConnectedContacts(transportId3)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId3)); // Check that a registered connection shows up - this should // broadcast a ConnectionOpenedEvent and a ContactConnectedEvent @@ -60,34 +80,41 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase { oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); }}); - c.registerConnection(contactId, transportId, true); - assertEquals(singletonList(contactId), - c.getConnectedContacts(transportId)); - assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + c.registerConnection(contactId1, transportId1, conn1, true); context.assertIsSatisfied(); - // Register an identical connection - this should broadcast a - // ConnectionOpenedEvent and lookup should be unaffected + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Register another connection with the same contact and transport - + // this should broadcast a ConnectionOpenedEvent and lookup should be + // unaffected context.checking(new Expectations() {{ oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); }}); - c.registerConnection(contactId, transportId, true); - assertEquals(singletonList(contactId), - c.getConnectedContacts(transportId)); - assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + c.registerConnection(contactId1, transportId1, conn2, true); context.assertIsSatisfied(); + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + // Unregister one of the connections - this should broadcast a // ConnectionClosedEvent and lookup should be unaffected context.checking(new Expectations() {{ oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class))); }}); - c.unregisterConnection(contactId, transportId, true); - assertEquals(singletonList(contactId), - c.getConnectedContacts(transportId)); - assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + c.unregisterConnection(contactId1, transportId1, conn1, true); context.assertIsSatisfied(); + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + // Unregister the other connection - this should broadcast a // ConnectionClosedEvent and a ContactDisconnectedEvent context.checking(new Expectations() {{ @@ -95,37 +122,363 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase { oneOf(eventBus).broadcast(with(any( ContactDisconnectedEvent.class))); }}); - c.unregisterConnection(contactId, transportId, true); - assertEquals(emptyList(), c.getConnectedContacts(transportId)); - assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + c.unregisterConnection(contactId1, transportId1, conn2, true); context.assertIsSatisfied(); + assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId1)); + // Try to unregister the connection again - exception should be thrown try { - c.unregisterConnection(contactId, transportId, true); + c.unregisterConnection(contactId1, transportId1, conn2, true); fail(); } catch (IllegalArgumentException expected) { // Expected } + } - // Register both contacts with one transport, one contact with both - - // this should broadcast three ConnectionOpenedEvents and two - // ContactConnectedEvents + @Test + public void testRegisterMultipleContacts() { + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue(emptyList())); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Register two contacts with one transport, then one of the contacts + // with a second transport - this should broadcast three + // ConnectionOpenedEvents and two ContactConnectedEvents context.checking(new Expectations() {{ exactly(3).of(eventBus).broadcast(with(any( ConnectionOpenedEvent.class))); exactly(2).of(eventBus).broadcast(with(any( ContactConnectedEvent.class))); }}); - c.registerConnection(contactId, transportId, true); - c.registerConnection(contactId1, transportId, true); - c.registerConnection(contactId1, transportId1, true); - Collection connected = c.getConnectedContacts(transportId); + c.registerConnection(contactId1, transportId1, conn1, true); + c.registerConnection(contactId2, transportId1, conn2, true); + c.registerConnection(contactId2, transportId2, conn3, true); + context.assertIsSatisfied(); + + Collection connected = c.getConnectedContacts(transportId1); assertEquals(2, connected.size()); - assertTrue(connected.contains(contactId)); assertTrue(connected.contains(contactId1)); + assertTrue(connected.contains(contactId2)); + + connected = c.getConnectedOrBetterContacts(transportId1); + assertEquals(2, connected.size()); + assertTrue(connected.contains(contactId1)); + assertTrue(connected.contains(contactId2)); + + assertEquals(singletonList(contactId2), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId2), + c.getConnectedOrBetterContacts(transportId2)); + } + + @Test + public void testNewConnectionIsInterruptedIfOldConnectionUsesBetterTransport() { + // Prefer transport 1 to transport 2 + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue( + singletonList(new Pair<>(transportId1, transportId2)))); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Connect via transport 1 (better than 2) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); + }}); + c.registerConnection(contactId1, transportId1, conn1, true); + context.assertIsSatisfied(); + assertEquals(singletonList(contactId1), c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // The contact is not connected via transport 2 but is connected via a + // better transport + assertEquals(emptyList(), c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 2 (worse than 1) - the new connection should + // be interrupted + context.checking(new Expectations() {{ + oneOf(conn2).interruptOutgoingSession(); + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerConnection(contactId1, transportId2, conn2, true); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 3 (no preference) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerConnection(contactId1, transportId3, conn3, true); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId3)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId3)); + + // Unregister the interrupted connection (transport 2) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class))); + }}); + c.unregisterConnection(contactId1, transportId2, conn2, true); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // The contact is not connected via transport 2 but is connected via a + // better transport + assertEquals(emptyList(), c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId3)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId3)); + } + + @Test + public void testOldConnectionIsInterruptedIfNewConnectionUsesBetterTransport() { + // Prefer transport 2 to transport 1 + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue( + singletonList(new Pair<>(transportId2, transportId1)))); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Connect via transport 1 (worse than 2) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); + }}); + c.registerConnection(contactId1, transportId1, conn1, true); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(emptyList(), c.getConnectedContacts(transportId2)); + assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 2 (better than 1) - the old connection should + // be interrupted + context.checking(new Expectations() {{ + oneOf(conn1).interruptOutgoingSession(); + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerConnection(contactId1, transportId2, conn2, true); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + // Connect via transport 3 (no preference) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerConnection(contactId1, transportId3, conn3, true); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId3)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId3)); + + // Unregister the interrupted connection (transport 1) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class))); + }}); + c.unregisterConnection(contactId1, transportId1, conn1, true); + context.assertIsSatisfied(); + + // The contact is not connected via transport 1 but is connected via a + // better transport + assertEquals(emptyList(), c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId2)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId2)); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId3)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId3)); + } + + @Test + public void testNewConnectionIsInterruptedIfOldConnectionHasHigherPriority() { + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue(emptyList())); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Register a connection with high priority + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); + }}); + c.registerConnection(contactId1, transportId1, conn1, true); + c.setPriority(contactId1, transportId1, conn1, high); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Register another connection via the same transport (no priority yet) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerConnection(contactId1, transportId1, conn2, true); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Set the priority of the second connection to low - the second + // connection should be interrupted + context.checking(new Expectations() {{ + oneOf(conn2).interruptOutgoingSession(); + }}); + c.setPriority(contactId1, transportId1, conn2, low); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Register a third connection with low priority - it should also be + // interrupted + context.checking(new Expectations() {{ + oneOf(conn3).interruptOutgoingSession(); + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerConnection(contactId1, transportId1, conn3, true); + c.setPriority(contactId1, transportId1, conn3, low); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + } + + @Test + public void testOldConnectionIsInterruptedIfNewConnectionHasHigherPriority() { + context.checking(new Expectations() {{ + allowing(pluginConfig).getTransportPreferences(); + will(returnValue(emptyList())); + }}); + + ConnectionRegistry c = + new ConnectionRegistryImpl(eventBus, pluginConfig); + + // Register a connection with low priority + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); + }}); + c.registerConnection(contactId1, transportId1, conn1, true); + c.setPriority(contactId1, transportId1, conn1, low); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Register another connection via the same transport (no priority yet) + context.checking(new Expectations() {{ + oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); + }}); + c.registerConnection(contactId1, transportId1, conn2, true); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); + + // Set the priority of the second connection to high - the first + // connection should be interrupted + context.checking(new Expectations() {{ + oneOf(conn1).interruptOutgoingSession(); + }}); + c.setPriority(contactId1, transportId1, conn2, high); + context.assertIsSatisfied(); + + assertEquals(singletonList(contactId1), + c.getConnectedContacts(transportId1)); + assertEquals(singletonList(contactId1), + c.getConnectedOrBetterContacts(transportId1)); } @Test