Compare commits

...

10 Commits

Author SHA1 Message Date
akwizgran
f663bf8667 Remove unused parameter. 2020-06-26 11:00:39 +01:00
akwizgran
ef5b91da89 Measure stability of each connection separately.
This should reduce the impact of connections that fail at the remote
end.
2020-06-26 11:00:39 +01:00
akwizgran
9909d205c7 Count stable connections even in case of connection failure. 2020-06-26 11:00:39 +01:00
akwizgran
9768b048d2 Impose limit on outgoing connections. 2020-06-26 11:00:38 +01:00
akwizgran
6dcad6c425 Count stable connections, but don't impose a connection limit. 2020-06-26 11:00:38 +01:00
akwizgran
648f26542c Simple connection limiter that closes connections cleanly. 2020-06-26 10:57:08 +01:00
akwizgran
730d553b0a Fix screenshot test (again). 2020-06-26 10:38:04 +01:00
akwizgran
7736a3b6fc Use separate methods for registering incoming and outgoing connections. 2020-06-26 09:59:03 +01:00
akwizgran
95f427863d Remove transport preferences for briar-headless. 2020-06-25 17:46:22 +01:00
akwizgran
78d7fc2106 Fix bug in reporting of connection state, add regression tests. 2020-06-02 12:00:06 +01:00
22 changed files with 280 additions and 146 deletions

View File

@@ -67,7 +67,7 @@ public class AndroidBluetoothPluginFactory implements DuplexPluginFactory {
@Override
public DuplexPlugin createPlugin(PluginCallback callback) {
BluetoothConnectionLimiter connectionLimiter =
new BluetoothConnectionLimiterImpl();
new BluetoothConnectionLimiterImpl(eventBus, clock);
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE);
AndroidBluetoothPlugin plugin = new AndroidBluetoothPlugin(

View File

@@ -22,14 +22,42 @@ import java.util.Collection;
public interface ConnectionRegistry {
/**
* Registers a connection with the given contact over the given transport.
* Registers an incoming connection from the given contact over the given
* transport. The connection's {@link Priority priority} can be set later
* via {@link #setPriority(ContactId, TransportId, InterruptibleConnection,
* Priority)} if a priority record is received from the contact.
* <p>
* Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts
* {@link ContactConnectedEvent} if this is the only connection with the
* contact.
*/
void registerConnection(ContactId c, TransportId t,
InterruptibleConnection conn, boolean incoming);
void registerIncomingConnection(ContactId c, TransportId t,
InterruptibleConnection conn);
/**
* Registers an outgoing connection to the given contact over the given
* transport.
* <p>
* Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts
* {@link ContactConnectedEvent} if this is the only connection with the
* contact.
* <p>
* If the registry has any "better" connections with the given contact, the
* given connection will be interrupted. If the registry has any "worse"
* connections with the given contact, those connections will be
* interrupted.
* <p>
* Connection A is considered "better" than connection B if both
* connections have had their priorities set, and either A's transport is
* {@link PluginConfig#getTransportPreferences() preferred} to B's, or
* they use the same transport and A has higher {@link Priority priority}
* than B.
* <p>
* For backward compatibility, connections without priorities are not
* considered better or worse than other connections.
*/
void registerOutgoingConnection(ContactId c, TransportId t,
InterruptibleConnection conn, Priority priority);
/**
* Unregisters a connection with the given contact over the given transport.
@@ -43,8 +71,8 @@ public interface ConnectionRegistry {
/**
* Sets the {@link Priority priority} of a connection that was previously
* registered via {@link #registerConnection(ContactId, TransportId,
* InterruptibleConnection, boolean)}.
* registered via {@link #registerIncomingConnection(ContactId, TransportId,
* InterruptibleConnection)}.
* <p>
* If the registry has any "better" connections with the given contact, the
* given connection will be interrupted. If the registry has any "worse"

View File

@@ -2,6 +2,7 @@ package org.briarproject.bramble.api.sync;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.InputStream;
@@ -14,10 +15,10 @@ public interface SyncSessionFactory {
SyncSession createIncomingSession(ContactId c, InputStream in,
PriorityHandler handler);
SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency,
StreamWriter streamWriter);
SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, StreamWriter streamWriter);
SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
int maxIdleTime, StreamWriter streamWriter,
SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, int maxIdleTime, StreamWriter streamWriter,
@Nullable Priority priority);
}

View File

@@ -0,0 +1,26 @@
package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when all sync connections using a given
* transport should be closed.
*/
@Immutable
@NotNullByDefault
public class CloseSyncConnectionsEvent extends Event {
private final TransportId transportId;
public CloseSyncConnectionsEvent(TransportId transportId) {
this.transportId = transportId;
}
public TransportId getTransportId() {
return transportId;
}
}

View File

@@ -61,7 +61,25 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
@Override
public void registerConnection(ContactId c, TransportId t,
public void registerIncomingConnection(ContactId c, TransportId t,
InterruptibleConnection conn) {
if (LOG.isLoggable(INFO)) {
LOG.info("Incoming connection registered: " + t);
}
registerConnection(c, t, conn, true);
}
@Override
public void registerOutgoingConnection(ContactId c, TransportId t,
InterruptibleConnection conn, Priority priority) {
if (LOG.isLoggable(INFO)) {
LOG.info("Outgoing connection registered: " + t);
}
registerConnection(c, t, conn, false);
setPriority(c, t, conn, priority);
}
private void registerConnection(ContactId c, TransportId t,
InterruptibleConnection conn, boolean incoming) {
if (LOG.isLoggable(INFO)) {
if (incoming) LOG.info("Incoming connection registered: " + t);
@@ -214,7 +232,8 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
@Override
public boolean isConnected(ContactId c) {
synchronized (lock) {
return contactConnections.containsKey(c);
List<ConnectionRecord> recs = contactConnections.get(c);
return recs != null && !recs.isEmpty();
}
}

View File

@@ -47,20 +47,24 @@ abstract class DuplexSyncConnection extends SyncConnection
@Override
public void interruptOutgoingSession() {
SyncSession out = null;
synchronized (interruptLock) {
if (outgoingSession == null) interruptWaiting = true;
else outgoingSession.interrupt();
else out = outgoingSession;
}
if (out != null) out.interrupt();
}
void setOutgoingSession(SyncSession outgoingSession) {
boolean interruptWasWaiting = false;
synchronized (interruptLock) {
this.outgoingSession = outgoingSession;
if (interruptWaiting) {
outgoingSession.interrupt();
interruptWasWaiting = true;
interruptWaiting = false;
}
}
if (interruptWasWaiting) outgoingSession.interrupt();
}
DuplexSyncConnection(KeyManager keyManager,
@@ -99,6 +103,7 @@ abstract class DuplexSyncConnection extends SyncConnection
w.getOutputStream(), ctx);
ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createDuplexOutgoingSession(c,
w.getMaxLatency(), w.getMaxIdleTime(), streamWriter, priority);
ctx.getTransportId(), w.getMaxLatency(), w.getMaxIdleTime(),
streamWriter, priority);
}
}

View File

@@ -59,8 +59,8 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
onReadError(true);
return;
}
connectionRegistry.registerConnection(contactId, transportId,
this, true);
connectionRegistry.registerIncomingConnection(contactId, transportId,
this);
// Start the outgoing session on another thread
ioExecutor.execute(() -> runOutgoingSession(contactId));
try {

View File

@@ -104,9 +104,8 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
onReadError();
return;
}
connectionRegistry.registerConnection(contactId, transportId,
this, false);
connectionRegistry.setPriority(contactId, transportId, this, priority);
connectionRegistry.registerOutgoingConnection(contactId, transportId,
this, priority);
try {
// Store any transport properties discovered from the connection
transportPropertyManager.addRemotePropertiesFromConnection(

View File

@@ -72,7 +72,7 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
w.getOutputStream(), ctx);
ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createSimplexOutgoingSession(c,
w.getMaxLatency(), streamWriter);
ctx.getTransportId(), w.getMaxLatency(), streamWriter);
}
}

View File

@@ -3,9 +3,16 @@ package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import static java.util.concurrent.TimeUnit.SECONDS;
@NotNullByDefault
interface BluetoothConnectionLimiter {
/**
* How long a connection must remain open before it's considered stable.
*/
long STABILITY_PERIOD_MS = SECONDS.toMillis(90);
/**
* Informs the limiter that key agreement has started.
*/
@@ -23,17 +30,9 @@ interface BluetoothConnectionLimiter {
boolean canOpenContactConnection();
/**
* Informs the limiter that a contact connection has been opened. The
* limiter may close the new connection if key agreement is in progress.
* <p/>
* Returns false if the limiter has closed the new connection.
* Informs the limiter that the given connection has been opened.
*/
boolean contactConnectionOpened(DuplexTransportConnection conn);
/**
* Informs the limiter that a key agreement connection has been opened.
*/
void keyAgreementConnectionOpened(DuplexTransportConnection conn);
void connectionOpened(DuplexTransportConnection conn);
/**
* Informs the limiter that the given connection has been closed.

View File

@@ -1,46 +1,53 @@
package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.system.Clock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.util.LogUtils.logException;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.plugin.BluetoothConstants.ID;
@NotNullByDefault
@ThreadSafe
class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
private static final Logger LOG =
Logger.getLogger(BluetoothConnectionLimiterImpl.class.getName());
getLogger(BluetoothConnectionLimiterImpl.class.getName());
private final EventBus eventBus;
private final Clock clock;
private final Object lock = new Object();
// The following are locking: lock
private final LinkedList<DuplexTransportConnection> connections =
new LinkedList<>();
@GuardedBy("lock")
private final List<ConnectionRecord> connections = new LinkedList<>();
@GuardedBy("lock")
private boolean keyAgreementInProgress = false;
@GuardedBy("lock")
private int connectionLimit = 2;
BluetoothConnectionLimiterImpl(EventBus eventBus, Clock clock) {
this.eventBus = eventBus;
this.clock = clock;
}
@Override
public void keyAgreementStarted() {
List<DuplexTransportConnection> close;
synchronized (lock) {
keyAgreementInProgress = true;
close = new ArrayList<>(connections);
connections.clear();
}
if (LOG.isLoggable(INFO)) {
LOG.info("Key agreement started, closing " + close.size() +
" connections");
}
for (DuplexTransportConnection conn : close) tryToClose(conn);
LOG.info("Key agreement started");
eventBus.broadcast(new CloseSyncConnectionsEvent(ID));
}
@Override
@@ -57,60 +64,81 @@ class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
if (keyAgreementInProgress) {
LOG.info("Can't open contact connection during key agreement");
return false;
} else {
}
long now = clock.currentTimeMillis();
countStableConnections(now);
if (connections.size() < connectionLimit) {
LOG.info("Can open contact connection");
return true;
}
}
}
@Override
public boolean contactConnectionOpened(DuplexTransportConnection conn) {
boolean accept = true;
synchronized (lock) {
if (keyAgreementInProgress) {
LOG.info("Refusing contact connection during key agreement");
accept = false;
} else {
LOG.info("Accepting contact connection");
connections.add(conn);
LOG.info("Can't open contact connection due to limit");
return false;
}
}
if (!accept) tryToClose(conn);
return accept;
}
@Override
public void keyAgreementConnectionOpened(DuplexTransportConnection conn) {
public void connectionOpened(DuplexTransportConnection conn) {
synchronized (lock) {
LOG.info("Accepting key agreement connection");
connections.add(conn);
}
}
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getWriter().dispose(false);
conn.getReader().dispose(false, false);
} catch (IOException e) {
logException(LOG, WARNING, e);
long now = clock.currentTimeMillis();
countStableConnections(now);
connections.add(new ConnectionRecord(conn, now));
if (LOG.isLoggable(INFO)) {
LOG.info("Connection opened, " + connections.size() + " open");
}
}
}
@Override
public void connectionClosed(DuplexTransportConnection conn) {
synchronized (lock) {
connections.remove(conn);
if (LOG.isLoggable(INFO))
countStableConnections(clock.currentTimeMillis());
Iterator<ConnectionRecord> it = connections.iterator();
while (it.hasNext()) {
if (it.next().conn == conn) {
it.remove();
break;
}
}
if (LOG.isLoggable(INFO)) {
LOG.info("Connection closed, " + connections.size() + " open");
}
}
}
@Override
public void allConnectionsClosed() {
synchronized (lock) {
long now = clock.currentTimeMillis();
countStableConnections(now);
connections.clear();
LOG.info("All connections closed");
}
}
@GuardedBy("lock")
private void countStableConnections(long now) {
int stable = 0;
for (ConnectionRecord rec : connections) {
if (now - rec.timeOpened >= STABILITY_PERIOD_MS) stable++;
}
if (stable > connectionLimit) {
connectionLimit = stable;
if (LOG.isLoggable(INFO)) {
LOG.info("Raising connection limit to " + connectionLimit);
}
}
}
private static class ConnectionRecord {
private final DuplexTransportConnection conn;
private final long timeOpened;
private ConnectionRecord(DuplexTransportConnection conn,
long timeOpened) {
this.conn = conn;
this.timeOpened = timeOpened;
}
}
}

View File

@@ -232,10 +232,9 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
return;
}
LOG.info("Connection received");
if (connectionLimiter.contactConnectionOpened(conn)) {
backoff.reset();
callback.handleConnection(conn);
}
connectionLimiter.connectionOpened(conn);
backoff.reset();
callback.handleConnection(conn);
if (!running) return;
}
}
@@ -327,8 +326,8 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
String uuid = p.get(PROP_UUID);
if (isNullOrEmpty(uuid)) return null;
DuplexTransportConnection conn = connect(address, uuid);
if (conn == null) return null;
return connectionLimiter.contactConnectionOpened(conn) ? conn : null;
if (conn != null) connectionLimiter.connectionOpened(conn);
return conn;
}
@Override
@@ -384,7 +383,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
LOG.info("Connecting to key agreement UUID " + uuid);
conn = connect(address, uuid);
}
if (conn != null) connectionLimiter.keyAgreementConnectionOpened(conn);
if (conn != null) connectionLimiter.connectionOpened(conn);
return conn;
}
@@ -453,7 +452,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
public KeyAgreementConnection accept() throws IOException {
DuplexTransportConnection conn = acceptConnection(ss);
if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection");
connectionLimiter.keyAgreementConnectionOpened(conn);
connectionLimiter.connectionOpened(conn);
return new KeyAgreementConnection(conn, ID);
}

View File

@@ -11,6 +11,7 @@ import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer;
@@ -19,6 +20,7 @@ import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
@@ -73,6 +75,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final EventBus eventBus;
private final Clock clock;
private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency, maxIdleTime;
private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter;
@@ -90,14 +93,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency,
int maxIdleTime, StreamWriter streamWriter,
SyncRecordWriter recordWriter, @Nullable Priority priority) {
EventBus eventBus, Clock clock, ContactId contactId,
TransportId transportId, int maxLatency, int maxIdleTime,
StreamWriter streamWriter, SyncRecordWriter recordWriter,
@Nullable Priority priority) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.clock = clock;
this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime;
this.streamWriter = streamWriter;
@@ -230,6 +235,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt();
} else if (e instanceof CloseSyncConnectionsEvent) {
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
if (c.getTransportId().equals(transportId)) interrupt();
}
}

View File

@@ -11,11 +11,13 @@ import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException;
@@ -56,6 +58,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private final Executor dbExecutor;
private final EventBus eventBus;
private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency;
private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter;
@@ -65,12 +68,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private volatile boolean interrupted = false;
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId, int maxLatency,
StreamWriter streamWriter, SyncRecordWriter recordWriter) {
EventBus eventBus, ContactId contactId, TransportId transportId,
int maxLatency, StreamWriter streamWriter,
SyncRecordWriter recordWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency;
this.streamWriter = streamWriter;
this.recordWriter = recordWriter;
@@ -123,6 +128,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt();
} else if (e instanceof CloseSyncConnectionsEvent) {
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
if (c.getTransportId().equals(transportId)) interrupt();
}
}

View File

@@ -5,6 +5,7 @@ import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.PriorityHandler;
import org.briarproject.bramble.api.sync.SyncRecordReader;
@@ -58,23 +59,23 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
}
@Override
public SyncSession createSimplexOutgoingSession(ContactId c,
public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, StreamWriter streamWriter) {
OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c,
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
maxLatency, streamWriter, recordWriter);
}
@Override
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
int maxIdleTime, StreamWriter streamWriter,
public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, int maxIdleTime, StreamWriter streamWriter,
@Nullable Priority priority) {
OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c,
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t,
maxLatency, maxIdleTime, streamWriter, recordWriter, priority);
}
}

View File

@@ -74,6 +74,10 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId2));
assertEquals(emptyList(), c.getConnectedContacts(transportId3));
assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId3));
assertFalse(c.isConnected(contactId1));
assertFalse(c.isConnected(contactId1, transportId1));
assertFalse(c.isConnected(contactId1, transportId2));
assertFalse(c.isConnected(contactId1, transportId3));
// Check that a registered connection shows up - this should
// broadcast a ConnectionOpenedEvent and a ContactConnectedEvent
@@ -81,13 +85,15 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn1, true);
c.registerIncomingConnection(contactId1, transportId1, conn1);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
c.getConnectedContacts(transportId1));
assertEquals(singletonList(contactId1),
c.getConnectedOrBetterContacts(transportId1));
assertTrue(c.isConnected(contactId1));
assertTrue(c.isConnected(contactId1, transportId1));
// Register another connection with the same contact and transport -
// this should broadcast a ConnectionOpenedEvent and lookup should be
@@ -95,13 +101,15 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn2, true);
c.registerIncomingConnection(contactId1, transportId1, conn2);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
c.getConnectedContacts(transportId1));
assertEquals(singletonList(contactId1),
c.getConnectedOrBetterContacts(transportId1));
assertTrue(c.isConnected(contactId1));
assertTrue(c.isConnected(contactId1, transportId1));
// Unregister one of the connections - this should broadcast a
// ConnectionClosedEvent and lookup should be unaffected
@@ -115,6 +123,8 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
c.getConnectedContacts(transportId1));
assertEquals(singletonList(contactId1),
c.getConnectedOrBetterContacts(transportId1));
assertTrue(c.isConnected(contactId1));
assertTrue(c.isConnected(contactId1, transportId1));
// Unregister the other connection - this should broadcast a
// ConnectionClosedEvent and a ContactDisconnectedEvent
@@ -128,6 +138,8 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
assertEquals(emptyList(), c.getConnectedContacts(transportId1));
assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId1));
assertFalse(c.isConnected(contactId1));
assertFalse(c.isConnected(contactId1, transportId1));
// Try to unregister the connection again - exception should be thrown
try {
@@ -158,11 +170,20 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
exactly(2).of(eventBus).broadcast(with(any(
ContactConnectedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn1, true);
c.registerConnection(contactId2, transportId1, conn2, true);
c.registerConnection(contactId2, transportId2, conn3, true);
c.registerIncomingConnection(contactId1, transportId1, conn1);
c.registerIncomingConnection(contactId2, transportId1, conn2);
c.registerIncomingConnection(contactId2, transportId2, conn3);
context.assertIsSatisfied();
assertTrue(c.isConnected(contactId1));
assertTrue(c.isConnected(contactId2));
assertTrue(c.isConnected(contactId1, transportId1));
assertFalse(c.isConnected(contactId1, transportId2));
assertTrue(c.isConnected(contactId2, transportId1));
assertTrue(c.isConnected(contactId2, transportId2));
Collection<ContactId> connected = c.getConnectedContacts(transportId1);
assertEquals(2, connected.size());
assertTrue(connected.contains(contactId1));
@@ -191,12 +212,12 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
ConnectionRegistry c =
new ConnectionRegistryImpl(eventBus, pluginConfig);
// Connect via transport 1 (worse than 2) and set priority to low
// Connect via transport 1 (worse than 2) with no priority set
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn1, true);
c.registerIncomingConnection(contactId1, transportId1, conn1);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -213,8 +234,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId2, conn2, true);
c.setPriority(contactId1, transportId2, conn2, high);
c.registerOutgoingConnection(contactId1, transportId2, conn2, high);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -232,8 +252,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId3, conn3, true);
c.setPriority(contactId1, transportId3, conn3, high);
c.registerOutgoingConnection(contactId1, transportId3, conn3, high);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -269,8 +288,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn1, true);
c.setPriority(contactId1, transportId1, conn1, low);
c.registerOutgoingConnection(contactId1, transportId1, conn1, low);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -291,8 +309,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(conn2).interruptOutgoingSession();
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId2, conn2, true);
c.setPriority(contactId1, transportId2, conn2, high);
c.registerOutgoingConnection(contactId1, transportId2, conn2, high);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -310,8 +327,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId3, conn3, true);
c.setPriority(contactId1, transportId3, conn3, low);
c.registerOutgoingConnection(contactId1, transportId3, conn3, low);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -370,8 +386,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn1, true);
c.setPriority(contactId1, transportId1, conn1, high);
c.registerOutgoingConnection(contactId1, transportId1, conn1, high);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -389,8 +404,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(conn1).interruptOutgoingSession();
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId2, conn2, true);
c.setPriority(contactId1, transportId2, conn2, low);
c.registerOutgoingConnection(contactId1, transportId2, conn2, low);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -408,7 +422,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId3, conn3, true);
c.registerOutgoingConnection(contactId1, transportId3, conn3, high);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -465,8 +479,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn1, true);
c.setPriority(contactId1, transportId1, conn1, high);
c.registerOutgoingConnection(contactId1, transportId1, conn1, high);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -478,7 +491,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn2, true);
c.registerIncomingConnection(contactId1, transportId1, conn2);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -505,8 +518,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(conn3).interruptOutgoingSession();
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn3, true);
c.setPriority(contactId1, transportId1, conn3, low);
c.registerOutgoingConnection(contactId1, transportId1, conn3, low);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -530,8 +542,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn1, true);
c.setPriority(contactId1, transportId1, conn1, low);
c.registerOutgoingConnection(contactId1, transportId1, conn1, low);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),
@@ -543,7 +554,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}});
c.registerConnection(contactId1, transportId1, conn2, true);
c.registerIncomingConnection(contactId1, transportId1, conn2);
context.assertIsSatisfied();
assertEquals(singletonList(contactId1),

View File

@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message;
@@ -23,6 +24,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.test.TestUtils.getContactId;
import static org.briarproject.bramble.test.TestUtils.getMessage;
import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.briarproject.bramble.test.TestUtils.getTransportId;
public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@@ -36,14 +38,15 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
private final Executor dbExecutor = new ImmediateExecutor();
private final ContactId contactId = getContactId();
private final TransportId transportId = getTransportId();
private final Message message = getMessage(new GroupId(getRandomId()));
private final MessageId messageId = message.getId();
@Test
public void testNothingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter,
recordWriter);
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false);
Transaction noMsgTxn = new Transaction(null, false);
@@ -76,8 +79,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
public void testSomethingToSend() throws Exception {
Ack ack = new Ack(singletonList(messageId));
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter,
recordWriter);
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
streamWriter, recordWriter);
Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false);
Transaction msgTxn = new Transaction(null, false);

View File

@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory;
import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory;
import org.briarproject.bramble.api.reliability.ReliabilityLayerFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.plugin.bluetooth.JavaBluetoothPluginFactory;
import org.briarproject.bramble.plugin.modem.ModemPluginFactory;
import org.briarproject.bramble.plugin.tcp.LanTcpPluginFactory;
@@ -39,10 +40,11 @@ public class DesktopPluginModule extends PluginModule {
PluginConfig getPluginConfig(@IoExecutor Executor ioExecutor,
SecureRandom random, BackoffFactory backoffFactory,
ReliabilityLayerFactory reliabilityFactory,
ShutdownManager shutdownManager, EventBus eventBus,
ShutdownManager shutdownManager, EventBus eventBus, Clock clock,
TimeoutMonitor timeoutMonitor) {
DuplexPluginFactory bluetooth = new JavaBluetoothPluginFactory(
ioExecutor, random, eventBus, timeoutMonitor, backoffFactory);
ioExecutor, random, eventBus, clock, timeoutMonitor,
backoffFactory);
DuplexPluginFactory modem = new ModemPluginFactory(ioExecutor,
reliabilityFactory);
DuplexPluginFactory lan = new LanTcpPluginFactory(ioExecutor,

View File

@@ -9,6 +9,7 @@ import org.briarproject.bramble.api.plugin.PluginCallback;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin;
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory;
import org.briarproject.bramble.api.system.Clock;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
@@ -30,15 +31,17 @@ public class JavaBluetoothPluginFactory implements DuplexPluginFactory {
private final Executor ioExecutor;
private final SecureRandom secureRandom;
private final EventBus eventBus;
private final Clock clock;
private final TimeoutMonitor timeoutMonitor;
private final BackoffFactory backoffFactory;
public JavaBluetoothPluginFactory(Executor ioExecutor,
SecureRandom secureRandom, EventBus eventBus,
SecureRandom secureRandom, EventBus eventBus, Clock clock,
TimeoutMonitor timeoutMonitor, BackoffFactory backoffFactory) {
this.ioExecutor = ioExecutor;
this.secureRandom = secureRandom;
this.eventBus = eventBus;
this.clock = clock;
this.timeoutMonitor = timeoutMonitor;
this.backoffFactory = backoffFactory;
}
@@ -56,7 +59,7 @@ public class JavaBluetoothPluginFactory implements DuplexPluginFactory {
@Override
public DuplexPlugin createPlugin(PluginCallback callback) {
BluetoothConnectionLimiter connectionLimiter =
new BluetoothConnectionLimiterImpl();
new BluetoothConnectionLimiterImpl(eventBus, clock);
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE);
JavaBluetoothPlugin plugin = new JavaBluetoothPlugin(connectionLimiter,

View File

@@ -120,8 +120,8 @@ public class SetupDataTest extends ScreenshotTest {
// TODO add messages
connectionRegistry.registerConnection(bob.getId(), ID, () -> {
}, true);
connectionRegistry.registerIncomingConnection(bob.getId(), ID, () -> {
});
}
}

View File

@@ -9,7 +9,9 @@ import org.briarproject.bramble.api.db.DatabaseConfig
import org.briarproject.bramble.api.event.EventBus
import org.briarproject.bramble.api.lifecycle.IoExecutor
import org.briarproject.bramble.api.network.NetworkManager
import org.briarproject.bramble.api.plugin.*
import org.briarproject.bramble.api.plugin.BackoffFactory
import org.briarproject.bramble.api.plugin.PluginConfig
import org.briarproject.bramble.api.plugin.TransportId
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory
import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory
import org.briarproject.bramble.api.system.Clock
@@ -32,7 +34,6 @@ import org.briarproject.briar.headless.forums.HeadlessForumModule
import org.briarproject.briar.headless.messaging.HeadlessMessagingModule
import java.io.File
import java.util.Collections.emptyList
import java.util.Collections.singletonMap
import java.util.concurrent.Executor
import javax.inject.Singleton
import javax.net.SocketFactory
@@ -88,9 +89,7 @@ internal class HeadlessModule(private val appDir: File) {
override fun getDuplexFactories(): Collection<DuplexPluginFactory> = duplex
override fun getSimplexFactories(): Collection<SimplexPluginFactory> = emptyList()
override fun shouldPoll(): Boolean = true
// Prefer LAN to Bluetooth
override fun getTransportPreferences(): Map<TransportId, List<TransportId>> =
singletonMap(BluetoothConstants.ID, listOf(LanTcpConstants.ID))
override fun getTransportPreferences(): Map<TransportId, List<TransportId>> = emptyMap()
}
}

View File

@@ -5,8 +5,6 @@ import dagger.Module
import dagger.Provides
import org.briarproject.bramble.api.FeatureFlags
import org.briarproject.bramble.api.db.DatabaseConfig
import org.briarproject.bramble.api.plugin.BluetoothConstants
import org.briarproject.bramble.api.plugin.LanTcpConstants
import org.briarproject.bramble.api.plugin.PluginConfig
import org.briarproject.bramble.api.plugin.TransportId
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory
@@ -21,7 +19,6 @@ import org.briarproject.briar.headless.event.HeadlessEventModule
import org.briarproject.briar.headless.forums.HeadlessForumModule
import org.briarproject.briar.headless.messaging.HeadlessMessagingModule
import java.io.File
import java.util.*
import java.util.Collections.emptyList
import javax.inject.Singleton
@@ -59,9 +56,7 @@ internal class HeadlessTestModule(private val appDir: File) {
override fun getDuplexFactories(): Collection<DuplexPluginFactory> = emptyList()
override fun getSimplexFactories(): Collection<SimplexPluginFactory> = emptyList()
override fun shouldPoll(): Boolean = false
// Prefer LAN to Bluetooth
override fun getTransportPreferences(): Map<TransportId, List<TransportId>> =
Collections.singletonMap(BluetoothConstants.ID, listOf(LanTcpConstants.ID))
override fun getTransportPreferences(): Map<TransportId, List<TransportId>> = emptyMap()
}
}