Combine connection chooser with connection registry.

This commit is contained in:
akwizgran
2020-05-25 16:42:01 +01:00
parent 36747acac1
commit 7d6b65913a
14 changed files with 542 additions and 313 deletions

View File

@@ -1,32 +0,0 @@
package org.briarproject.bramble.connection;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Priority;
/**
* Chooses one connection per contact and transport to keep open and closes
* any other connections.
*/
@NotNullByDefault
interface ConnectionChooser {
/**
* Adds the given connection to the chooser with the given priority.
* <p>
* If the chooser has a connection with the same contact and transport and
* a lower {@link Priority priority}, that connection will be
* {@link InterruptibleConnection#interruptOutgoingSession() interrupted}.
* If the chooser has a connection with the same contact and transport and
* a higher priority, the newly added connection will be interrupted.
*/
void addConnection(ContactId c, TransportId t, InterruptibleConnection conn,
Priority p);
/**
* Removes the given connection from the chooser.
*/
void removeConnection(ContactId c, TransportId t,
InterruptibleConnection conn);
}

View File

@@ -1,100 +0,0 @@
package org.briarproject.bramble.connection;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Priority;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.Bytes.compare;
@NotNullByDefault
class ConnectionChooserImpl implements ConnectionChooser {
private static final Logger LOG =
getLogger(ConnectionChooserImpl.class.getName());
private final Object lock = new Object();
@GuardedBy("lock")
private final Map<Key, Value> bestConnections = new HashMap<>();
@Inject
ConnectionChooserImpl() {
}
@Override
public void addConnection(ContactId c, TransportId t,
InterruptibleConnection conn, Priority p) {
InterruptibleConnection close = null;
synchronized (lock) {
Key k = new Key(c, t);
Value best = bestConnections.get(k);
if (best == null) {
bestConnections.put(k, new Value(conn, p));
} else if (compare(p.getNonce(), best.priority.getNonce()) > 0) {
LOG.info("Found a better connection");
close = best.connection;
bestConnections.put(k, new Value(conn, p));
} else {
LOG.info("Already have a better connection");
close = conn;
}
}
if (close != null) close.interruptOutgoingSession();
}
@Override
public void removeConnection(ContactId c, TransportId t,
InterruptibleConnection conn) {
synchronized (lock) {
Key k = new Key(c, t);
Value best = bestConnections.get(k);
if (best.connection == conn) bestConnections.remove(k);
}
}
private static class Key {
private final ContactId contactId;
private final TransportId transportId;
private Key(ContactId contactId, TransportId transportId) {
this.contactId = contactId;
this.transportId = transportId;
}
@Override
public int hashCode() {
return contactId.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof Key) {
Key k = (Key) o;
return contactId.equals(k.contactId) &&
transportId.equals(k.transportId);
} else {
return false;
}
}
}
private static class Value {
private final InterruptibleConnection connection;
private final Priority priority;
private Value(InterruptibleConnection connection, Priority priority) {
this.connection = connection;
this.priority = priority;
}
}
}

View File

@@ -37,7 +37,6 @@ class ConnectionManagerImpl implements ConnectionManager {
private final ContactExchangeManager contactExchangeManager;
private final ConnectionRegistry connectionRegistry;
private final TransportPropertyManager transportPropertyManager;
private final ConnectionChooser connectionChooser;
private final SecureRandom secureRandom;
@Inject
@@ -49,7 +48,7 @@ class ConnectionManagerImpl implements ConnectionManager {
ContactExchangeManager contactExchangeManager,
ConnectionRegistry connectionRegistry,
TransportPropertyManager transportPropertyManager,
ConnectionChooser connectionChooser, SecureRandom secureRandom) {
SecureRandom secureRandom) {
this.ioExecutor = ioExecutor;
this.keyManager = keyManager;
this.streamReaderFactory = streamReaderFactory;
@@ -59,7 +58,6 @@ class ConnectionManagerImpl implements ConnectionManager {
this.contactExchangeManager = contactExchangeManager;
this.connectionRegistry = connectionRegistry;
this.transportPropertyManager = transportPropertyManager;
this.connectionChooser = connectionChooser;
this.secureRandom = secureRandom;
}
@@ -78,7 +76,7 @@ class ConnectionManagerImpl implements ConnectionManager {
ioExecutor.execute(new IncomingDuplexSyncConnection(keyManager,
connectionRegistry, streamReaderFactory, streamWriterFactory,
syncSessionFactory, transportPropertyManager, ioExecutor,
connectionChooser, t, d));
t, d));
}
@Override
@@ -103,7 +101,7 @@ class ConnectionManagerImpl implements ConnectionManager {
ioExecutor.execute(new OutgoingDuplexSyncConnection(keyManager,
connectionRegistry, streamReaderFactory, streamWriterFactory,
syncSessionFactory, transportPropertyManager, ioExecutor,
connectionChooser, secureRandom, c, t, d));
secureRandom, c, t, d));
}
@Override

View File

@@ -23,11 +23,4 @@ public class ConnectionModule {
ConnectionRegistryImpl connectionRegistry) {
return connectionRegistry;
}
@Provides
@Singleton
ConnectionChooser provideConnectionChooser(
ConnectionChooserImpl connectionChooser) {
return connectionChooser;
}
}

View File

@@ -1,7 +1,9 @@
package org.briarproject.bramble.connection;
import org.briarproject.bramble.api.Bytes;
import org.briarproject.bramble.api.Pair;
import org.briarproject.bramble.api.connection.ConnectionRegistry;
import org.briarproject.bramble.api.connection.InterruptibleConnection;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.PendingContactId;
import org.briarproject.bramble.api.event.EventBus;
@@ -14,6 +16,7 @@ import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent;
import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent;
import org.briarproject.bramble.api.sync.Priority;
import java.util.ArrayList;
import java.util.Collection;
@@ -25,6 +28,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
@@ -42,7 +46,6 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
private final EventBus eventBus;
private final Map<TransportId, List<TransportId>> betterTransports;
private final Map<TransportId, List<TransportId>> worseTransports;
private final Object lock = new Object();
@GuardedBy("lock")
@@ -54,7 +57,6 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
ConnectionRegistryImpl(EventBus eventBus, PluginConfig pluginConfig) {
this.eventBus = eventBus;
betterTransports = new HashMap<>();
worseTransports = new HashMap<>();
initTransportPreferences(pluginConfig.getTransportPreferences());
contactConnections = new HashMap<>();
connectedPendingContacts = new HashSet<>();
@@ -71,31 +73,50 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
betterTransports.put(worse, betterList);
}
betterList.add(better);
List<TransportId> worseList = worseTransports.get(better);
if (worseList == null) {
worseList = new ArrayList<>();
worseTransports.put(better, worseList);
}
worseList.add(worse);
}
}
@Override
public void registerConnection(ContactId c, TransportId t,
boolean incoming) {
InterruptibleConnection conn, boolean incoming) {
if (LOG.isLoggable(INFO)) {
if (incoming) LOG.info("Incoming connection registered: " + t);
else LOG.info("Outgoing connection registered: " + t);
}
boolean firstConnection = false;
List<InterruptibleConnection> toInterrupt;
boolean firstConnection = false, interruptNewConnection = false;
synchronized (lock) {
List<ConnectionRecord> recs = contactConnections.get(c);
if (recs == null) {
recs = new ArrayList<>();
contactConnections.put(c, recs);
}
if (recs.isEmpty()) firstConnection = true;
recs.add(new ConnectionRecord(t));
if (recs.isEmpty()) {
toInterrupt = emptyList();
firstConnection = true;
} else {
toInterrupt = new ArrayList<>(recs.size());
for (ConnectionRecord rec : recs) {
int compare = compare(t, rec.transportId);
if (compare == -1) {
// The old connection is better than the new one
interruptNewConnection = true;
} else if (compare == 1 && !rec.interrupted) {
// The new connection is better than the old one
toInterrupt.add(rec.conn);
rec.interrupted = true;
}
}
}
recs.add(new ConnectionRecord(t, conn));
}
if (interruptNewConnection) {
LOG.info("Interrupting new connection");
conn.interruptOutgoingSession();
}
for (InterruptibleConnection old : toInterrupt) {
LOG.info("Interrupting old connection");
old.interruptOutgoingSession();
}
eventBus.broadcast(new ConnectionOpenedEvent(c, t, incoming));
if (firstConnection) {
@@ -104,9 +125,61 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
}
private int compare(TransportId a, TransportId b) {
if (getBetterTransports(a).contains(b)) return -1;
else if (getBetterTransports(b).contains(a)) return 1;
else return 0;
}
private List<TransportId> getBetterTransports(TransportId t) {
List<TransportId> better = betterTransports.get(t);
return better == null ? emptyList() : better;
}
@Override
public void setPriority(ContactId c, TransportId t,
InterruptibleConnection conn, Priority priority) {
if (LOG.isLoggable(INFO)) LOG.info("Setting connection priority: " + t);
List<InterruptibleConnection> toInterrupt;
boolean interruptNewConnection = false;
synchronized (lock) {
List<ConnectionRecord> recs = contactConnections.get(c);
if (recs == null) throw new IllegalArgumentException();
toInterrupt = new ArrayList<>(recs.size());
for (ConnectionRecord rec : recs) {
if (rec.conn == conn) {
// Store the priority of this connection
rec.priority = priority;
} else if (rec.transportId.equals(t)) {
int compare = compare(priority, rec.priority);
if (compare == -1) {
// The old connection is better than the new one
interruptNewConnection = true;
} else if (compare == 1 && !rec.interrupted) {
// The new connection is better than the old one
toInterrupt.add(rec.conn);
rec.interrupted = true;
}
}
}
}
if (interruptNewConnection) {
LOG.info("Interrupting new connection");
conn.interruptOutgoingSession();
}
for (InterruptibleConnection old : toInterrupt) {
LOG.info("Interrupting old connection");
old.interruptOutgoingSession();
}
}
private int compare(Priority a, @Nullable Priority b) {
return b == null ? 0 : Bytes.compare(a.getNonce(), b.getNonce());
}
@Override
public void unregisterConnection(ContactId c, TransportId t,
boolean incoming) {
InterruptibleConnection conn, boolean incoming) {
if (LOG.isLoggable(INFO)) {
if (incoming) LOG.info("Incoming connection unregistered: " + t);
else LOG.info("Outgoing connection unregistered: " + t);
@@ -114,7 +187,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
boolean lastConnection = false;
synchronized (lock) {
List<ConnectionRecord> recs = contactConnections.get(c);
if (recs == null || !recs.remove(new ConnectionRecord(t)))
if (recs == null || !recs.remove(new ConnectionRecord(t, conn)))
throw new IllegalArgumentException();
if (recs.isEmpty()) lastConnection = true;
}
@@ -146,8 +219,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
@Override
public Collection<ContactId> getConnectedOrPreferredContacts(
TransportId t) {
public Collection<ContactId> getConnectedOrBetterContacts(TransportId t) {
synchronized (lock) {
List<TransportId> better = betterTransports.get(t);
if (better == null) better = emptyList();
@@ -164,7 +236,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
if (LOG.isLoggable(INFO)) {
LOG.info(contactIds.size()
+ " contacts connected or preferred: " + t);
+ " contacts connected or better: " + t);
}
return contactIds;
}
@@ -208,26 +280,34 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
eventBus.broadcast(new RendezvousConnectionClosedEvent(p, success));
}
private static class ConnectionRecord {
private class ConnectionRecord {
private final TransportId transportId;
private final InterruptibleConnection conn;
@GuardedBy("lock")
@Nullable
private Priority priority = null;
@GuardedBy("lock")
private boolean interrupted = false;
private ConnectionRecord(TransportId transportId) {
private ConnectionRecord(TransportId transportId,
InterruptibleConnection conn) {
this.transportId = transportId;
this.conn = conn;
}
@Override
public boolean equals(Object o) {
if (o instanceof ConnectionRecord) {
ConnectionRecord rec = (ConnectionRecord) o;
return transportId.equals(rec.transportId);
return conn == ((ConnectionRecord) o).conn;
} else {
return false;
}
return false;
}
@Override
public int hashCode() {
return transportId.hashCode();
return conn.hashCode();
}
}
}

View File

@@ -1,6 +1,7 @@
package org.briarproject.bramble.connection;
import org.briarproject.bramble.api.connection.ConnectionRegistry;
import org.briarproject.bramble.api.connection.InterruptibleConnection;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
@@ -31,7 +32,6 @@ abstract class DuplexSyncConnection extends SyncConnection
implements InterruptibleConnection {
final Executor ioExecutor;
final ConnectionChooser connectionChooser;
final TransportId transportId;
final TransportConnectionReader reader;
final TransportConnectionWriter writer;
@@ -69,13 +69,12 @@ abstract class DuplexSyncConnection extends SyncConnection
StreamWriterFactory streamWriterFactory,
SyncSessionFactory syncSessionFactory,
TransportPropertyManager transportPropertyManager,
Executor ioExecutor, ConnectionChooser connectionChooser,
TransportId transportId, DuplexTransportConnection connection) {
Executor ioExecutor, TransportId transportId,
DuplexTransportConnection connection) {
super(keyManager, connectionRegistry, streamReaderFactory,
streamWriterFactory, syncSessionFactory,
transportPropertyManager);
this.ioExecutor = ioExecutor;
this.connectionChooser = connectionChooser;
this.transportId = transportId;
reader = connection.getReader();
writer = connection.getWriter();

View File

@@ -31,12 +31,11 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
StreamWriterFactory streamWriterFactory,
SyncSessionFactory syncSessionFactory,
TransportPropertyManager transportPropertyManager,
Executor ioExecutor, ConnectionChooser connectionChooser,
TransportId transportId, DuplexTransportConnection connection) {
Executor ioExecutor, TransportId transportId,
DuplexTransportConnection connection) {
super(keyManager, connectionRegistry, streamReaderFactory,
streamWriterFactory, syncSessionFactory,
transportPropertyManager, ioExecutor, connectionChooser,
transportId, connection);
transportPropertyManager, ioExecutor, transportId, connection);
}
@Override
@@ -60,15 +59,16 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
onReadError(true);
return;
}
connectionRegistry.registerConnection(contactId, transportId, true);
connectionRegistry.registerConnection(contactId, transportId,
this, true);
// Start the outgoing session on another thread
ioExecutor.execute(() -> runOutgoingSession(contactId));
try {
// Store any transport properties discovered from the connection
transportPropertyManager.addRemotePropertiesFromConnection(
contactId, transportId, remote);
// Add the connection to the chooser when we receive its priority
PriorityHandler handler = p -> connectionChooser.addConnection(
// Update the connection registry when we receive our priority
PriorityHandler handler = p -> connectionRegistry.setPriority(
contactId, transportId, this, p);
// Create and run the incoming session
createIncomingSession(ctx, reader, handler).run();
@@ -79,8 +79,7 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
onReadError(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
true);
connectionChooser.removeConnection(contactId, transportId, this);
this, true);
}
}

View File

@@ -59,7 +59,6 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable {
onError(true);
return;
}
connectionRegistry.registerConnection(contactId, transportId, true);
try {
// We don't expect to receive a priority for this connection
PriorityHandler handler = p ->
@@ -70,9 +69,6 @@ class IncomingSimplexSyncConnection extends SyncConnection implements Runnable {
} catch (IOException e) {
logException(LOG, WARNING, e);
onError(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
true);
}
}

View File

@@ -1,19 +0,0 @@
package org.briarproject.bramble.connection;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
/**
* A duplex sync connection that can be closed by interrupting its outgoing
* sync session.
*/
@NotNullByDefault
interface InterruptibleConnection {
/**
* Interrupts the connection's outgoing sync session. If the underlying
* transport connection is alive and the remote peer is cooperative, this
* should result in both sync sessions ending and the connection being
* cleanly closed.
*/
void interruptOutgoingSession();
}

View File

@@ -37,13 +37,11 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
StreamWriterFactory streamWriterFactory,
SyncSessionFactory syncSessionFactory,
TransportPropertyManager transportPropertyManager,
Executor ioExecutor, ConnectionChooser connectionChooser,
SecureRandom secureRandom, ContactId contactId,
Executor ioExecutor, SecureRandom secureRandom, ContactId contactId,
TransportId transportId, DuplexTransportConnection connection) {
super(keyManager, connectionRegistry, streamReaderFactory,
streamWriterFactory, syncSessionFactory,
transportPropertyManager, ioExecutor, connectionChooser,
transportId, connection);
transportPropertyManager, ioExecutor, transportId, connection);
this.secureRandom = secureRandom;
this.contactId = contactId;
}
@@ -106,8 +104,9 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
onReadError();
return;
}
connectionRegistry.registerConnection(contactId, transportId, false);
connectionChooser.addConnection(contactId, transportId, this, priority);
connectionRegistry.registerConnection(contactId, transportId,
this, false);
connectionRegistry.setPriority(contactId, transportId, this, priority);
try {
// Store any transport properties discovered from the connection
transportPropertyManager.addRemotePropertiesFromConnection(
@@ -124,8 +123,7 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
onReadError();
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
false);
connectionChooser.removeConnection(contactId, transportId, this);
this, false);
}
}

View File

@@ -52,7 +52,6 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
onError();
return;
}
connectionRegistry.registerConnection(contactId, transportId, false);
try {
// Create and run the outgoing session
createSimplexOutgoingSession(ctx, writer).run();
@@ -60,9 +59,6 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
} catch (IOException e) {
logException(LOG, WARNING, e);
onError();
} finally {
connectionRegistry.unregisterConnection(contactId, transportId,
false);
}
}