Use separate methods for registering incoming and outgoing connections.

This commit is contained in:
akwizgran
2020-06-26 09:59:03 +01:00
parent 95f427863d
commit 7736a3b6fc
5 changed files with 76 additions and 41 deletions

View File

@@ -22,14 +22,42 @@ import java.util.Collection;
public interface ConnectionRegistry { public interface ConnectionRegistry {
/** /**
* Registers a connection with the given contact over the given transport. * Registers an incoming connection from the given contact over the given
* transport. The connection's {@link Priority priority} can be set later
* via {@link #setPriority(ContactId, TransportId, InterruptibleConnection,
* Priority)} if a priority record is received from the contact.
* <p> * <p>
* Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts * Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts
* {@link ContactConnectedEvent} if this is the only connection with the * {@link ContactConnectedEvent} if this is the only connection with the
* contact. * contact.
*/ */
void registerConnection(ContactId c, TransportId t, void registerIncomingConnection(ContactId c, TransportId t,
InterruptibleConnection conn, boolean incoming); InterruptibleConnection conn);
/**
* Registers an outgoing connection to the given contact over the given
* transport.
* <p>
* Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts
* {@link ContactConnectedEvent} if this is the only connection with the
* contact.
* <p>
* If the registry has any "better" connections with the given contact, the
* given connection will be interrupted. If the registry has any "worse"
* connections with the given contact, those connections will be
* interrupted.
* <p>
* Connection A is considered "better" than connection B if both
* connections have had their priorities set, and either A's transport is
* {@link PluginConfig#getTransportPreferences() preferred} to B's, or
* they use the same transport and A has higher {@link Priority priority}
* than B.
* <p>
* For backward compatibility, connections without priorities are not
* considered better or worse than other connections.
*/
void registerOutgoingConnection(ContactId c, TransportId t,
InterruptibleConnection conn, Priority priority);
/** /**
* Unregisters a connection with the given contact over the given transport. * Unregisters a connection with the given contact over the given transport.
@@ -43,8 +71,8 @@ public interface ConnectionRegistry {
/** /**
* Sets the {@link Priority priority} of a connection that was previously * Sets the {@link Priority priority} of a connection that was previously
* registered via {@link #registerConnection(ContactId, TransportId, * registered via {@link #registerIncomingConnection(ContactId, TransportId,
* InterruptibleConnection, boolean)}. * InterruptibleConnection)}.
* <p> * <p>
* If the registry has any "better" connections with the given contact, the * If the registry has any "better" connections with the given contact, the
* given connection will be interrupted. If the registry has any "worse" * given connection will be interrupted. If the registry has any "worse"

View File

@@ -61,7 +61,25 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
} }
@Override @Override
public void registerConnection(ContactId c, TransportId t, public void registerIncomingConnection(ContactId c, TransportId t,
InterruptibleConnection conn) {
if (LOG.isLoggable(INFO)) {
LOG.info("Incoming connection registered: " + t);
}
registerConnection(c, t, conn, true);
}
@Override
public void registerOutgoingConnection(ContactId c, TransportId t,
InterruptibleConnection conn, Priority priority) {
if (LOG.isLoggable(INFO)) {
LOG.info("Outgoing connection registered: " + t);
}
registerConnection(c, t, conn, false);
setPriority(c, t, conn, priority);
}
private void registerConnection(ContactId c, TransportId t,
InterruptibleConnection conn, boolean incoming) { InterruptibleConnection conn, boolean incoming) {
if (LOG.isLoggable(INFO)) { if (LOG.isLoggable(INFO)) {
if (incoming) LOG.info("Incoming connection registered: " + t); if (incoming) LOG.info("Incoming connection registered: " + t);

View File

@@ -59,8 +59,8 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
onReadError(true); onReadError(true);
return; return;
} }
connectionRegistry.registerConnection(contactId, transportId, connectionRegistry.registerIncomingConnection(contactId, transportId,
this, true); this);
// Start the outgoing session on another thread // Start the outgoing session on another thread
ioExecutor.execute(() -> runOutgoingSession(contactId)); ioExecutor.execute(() -> runOutgoingSession(contactId));
try { try {

View File

@@ -104,9 +104,8 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
onReadError(); onReadError();
return; return;
} }
connectionRegistry.registerConnection(contactId, transportId, connectionRegistry.registerOutgoingConnection(contactId, transportId,
this, false); this, priority);
connectionRegistry.setPriority(contactId, transportId, this, priority);
try { try {
// Store any transport properties discovered from the connection // Store any transport properties discovered from the connection
transportPropertyManager.addRemotePropertiesFromConnection( transportPropertyManager.addRemotePropertiesFromConnection(

View File

@@ -85,7 +85,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerIncomingConnection(contactId1, transportId1, conn1);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -101,7 +101,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn2, true); c.registerIncomingConnection(contactId1, transportId1, conn2);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -170,9 +170,9 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
exactly(2).of(eventBus).broadcast(with(any( exactly(2).of(eventBus).broadcast(with(any(
ContactConnectedEvent.class))); ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerIncomingConnection(contactId1, transportId1, conn1);
c.registerConnection(contactId2, transportId1, conn2, true); c.registerIncomingConnection(contactId2, transportId1, conn2);
c.registerConnection(contactId2, transportId2, conn3, true); c.registerIncomingConnection(contactId2, transportId2, conn3);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertTrue(c.isConnected(contactId1)); assertTrue(c.isConnected(contactId1));
@@ -212,12 +212,12 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
ConnectionRegistry c = ConnectionRegistry c =
new ConnectionRegistryImpl(eventBus, pluginConfig); new ConnectionRegistryImpl(eventBus, pluginConfig);
// Connect via transport 1 (worse than 2) and set priority to low // Connect via transport 1 (worse than 2) with no priority set
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerIncomingConnection(contactId1, transportId1, conn1);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -234,8 +234,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId2, conn2, true); c.registerOutgoingConnection(contactId1, transportId2, conn2, high);
c.setPriority(contactId1, transportId2, conn2, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -253,8 +252,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId3, conn3, true); c.registerOutgoingConnection(contactId1, transportId3, conn3, high);
c.setPriority(contactId1, transportId3, conn3, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -290,8 +288,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerOutgoingConnection(contactId1, transportId1, conn1, low);
c.setPriority(contactId1, transportId1, conn1, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -312,8 +309,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(conn2).interruptOutgoingSession(); oneOf(conn2).interruptOutgoingSession();
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId2, conn2, true); c.registerOutgoingConnection(contactId1, transportId2, conn2, high);
c.setPriority(contactId1, transportId2, conn2, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -331,8 +327,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId3, conn3, true); c.registerOutgoingConnection(contactId1, transportId3, conn3, low);
c.setPriority(contactId1, transportId3, conn3, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -391,8 +386,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerOutgoingConnection(contactId1, transportId1, conn1, high);
c.setPriority(contactId1, transportId1, conn1, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -410,8 +404,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(conn1).interruptOutgoingSession(); oneOf(conn1).interruptOutgoingSession();
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId2, conn2, true); c.registerOutgoingConnection(contactId1, transportId2, conn2, low);
c.setPriority(contactId1, transportId2, conn2, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -429,7 +422,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId3, conn3, true); c.registerOutgoingConnection(contactId1, transportId3, conn3, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -486,8 +479,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerOutgoingConnection(contactId1, transportId1, conn1, high);
c.setPriority(contactId1, transportId1, conn1, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -499,7 +491,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn2, true); c.registerIncomingConnection(contactId1, transportId1, conn2);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -526,8 +518,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(conn3).interruptOutgoingSession(); oneOf(conn3).interruptOutgoingSession();
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn3, true); c.registerOutgoingConnection(contactId1, transportId1, conn3, low);
c.setPriority(contactId1, transportId1, conn3, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -551,8 +542,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerOutgoingConnection(contactId1, transportId1, conn1, low);
c.setPriority(contactId1, transportId1, conn1, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -564,7 +554,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn2, true); c.registerIncomingConnection(contactId1, transportId1, conn2);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),