Compare commits

...

10 Commits

Author SHA1 Message Date
akwizgran
f663bf8667 Remove unused parameter. 2020-06-26 11:00:39 +01:00
akwizgran
ef5b91da89 Measure stability of each connection separately.
This should reduce the impact of connections that fail at the remote
end.
2020-06-26 11:00:39 +01:00
akwizgran
9909d205c7 Count stable connections even in case of connection failure. 2020-06-26 11:00:39 +01:00
akwizgran
9768b048d2 Impose limit on outgoing connections. 2020-06-26 11:00:38 +01:00
akwizgran
6dcad6c425 Count stable connections, but don't impose a connection limit. 2020-06-26 11:00:38 +01:00
akwizgran
648f26542c Simple connection limiter that closes connections cleanly. 2020-06-26 10:57:08 +01:00
akwizgran
730d553b0a Fix screenshot test (again). 2020-06-26 10:38:04 +01:00
akwizgran
7736a3b6fc Use separate methods for registering incoming and outgoing connections. 2020-06-26 09:59:03 +01:00
akwizgran
95f427863d Remove transport preferences for briar-headless. 2020-06-25 17:46:22 +01:00
akwizgran
78d7fc2106 Fix bug in reporting of connection state, add regression tests. 2020-06-02 12:00:06 +01:00
22 changed files with 280 additions and 146 deletions

View File

@@ -67,7 +67,7 @@ public class AndroidBluetoothPluginFactory implements DuplexPluginFactory {
@Override @Override
public DuplexPlugin createPlugin(PluginCallback callback) { public DuplexPlugin createPlugin(PluginCallback callback) {
BluetoothConnectionLimiter connectionLimiter = BluetoothConnectionLimiter connectionLimiter =
new BluetoothConnectionLimiterImpl(); new BluetoothConnectionLimiterImpl(eventBus, clock);
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL, Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE); MAX_POLLING_INTERVAL, BACKOFF_BASE);
AndroidBluetoothPlugin plugin = new AndroidBluetoothPlugin( AndroidBluetoothPlugin plugin = new AndroidBluetoothPlugin(

View File

@@ -22,14 +22,42 @@ import java.util.Collection;
public interface ConnectionRegistry { public interface ConnectionRegistry {
/** /**
* Registers a connection with the given contact over the given transport. * Registers an incoming connection from the given contact over the given
* transport. The connection's {@link Priority priority} can be set later
* via {@link #setPriority(ContactId, TransportId, InterruptibleConnection,
* Priority)} if a priority record is received from the contact.
* <p> * <p>
* Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts * Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts
* {@link ContactConnectedEvent} if this is the only connection with the * {@link ContactConnectedEvent} if this is the only connection with the
* contact. * contact.
*/ */
void registerConnection(ContactId c, TransportId t, void registerIncomingConnection(ContactId c, TransportId t,
InterruptibleConnection conn, boolean incoming); InterruptibleConnection conn);
/**
* Registers an outgoing connection to the given contact over the given
* transport.
* <p>
* Broadcasts {@link ConnectionOpenedEvent}. Also broadcasts
* {@link ContactConnectedEvent} if this is the only connection with the
* contact.
* <p>
* If the registry has any "better" connections with the given contact, the
* given connection will be interrupted. If the registry has any "worse"
* connections with the given contact, those connections will be
* interrupted.
* <p>
* Connection A is considered "better" than connection B if both
* connections have had their priorities set, and either A's transport is
* {@link PluginConfig#getTransportPreferences() preferred} to B's, or
* they use the same transport and A has higher {@link Priority priority}
* than B.
* <p>
* For backward compatibility, connections without priorities are not
* considered better or worse than other connections.
*/
void registerOutgoingConnection(ContactId c, TransportId t,
InterruptibleConnection conn, Priority priority);
/** /**
* Unregisters a connection with the given contact over the given transport. * Unregisters a connection with the given contact over the given transport.
@@ -43,8 +71,8 @@ public interface ConnectionRegistry {
/** /**
* Sets the {@link Priority priority} of a connection that was previously * Sets the {@link Priority priority} of a connection that was previously
* registered via {@link #registerConnection(ContactId, TransportId, * registered via {@link #registerIncomingConnection(ContactId, TransportId,
* InterruptibleConnection, boolean)}. * InterruptibleConnection)}.
* <p> * <p>
* If the registry has any "better" connections with the given contact, the * If the registry has any "better" connections with the given contact, the
* given connection will be interrupted. If the registry has any "worse" * given connection will be interrupted. If the registry has any "worse"

View File

@@ -2,6 +2,7 @@ package org.briarproject.bramble.api.sync;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.InputStream; import java.io.InputStream;
@@ -14,10 +15,10 @@ public interface SyncSessionFactory {
SyncSession createIncomingSession(ContactId c, InputStream in, SyncSession createIncomingSession(ContactId c, InputStream in,
PriorityHandler handler); PriorityHandler handler);
SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency, SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
StreamWriter streamWriter); int maxLatency, StreamWriter streamWriter);
SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
int maxIdleTime, StreamWriter streamWriter, int maxLatency, int maxIdleTime, StreamWriter streamWriter,
@Nullable Priority priority); @Nullable Priority priority);
} }

View File

@@ -0,0 +1,26 @@
package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when all sync connections using a given
* transport should be closed.
*/
@Immutable
@NotNullByDefault
public class CloseSyncConnectionsEvent extends Event {
private final TransportId transportId;
public CloseSyncConnectionsEvent(TransportId transportId) {
this.transportId = transportId;
}
public TransportId getTransportId() {
return transportId;
}
}

View File

@@ -61,7 +61,25 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
} }
@Override @Override
public void registerConnection(ContactId c, TransportId t, public void registerIncomingConnection(ContactId c, TransportId t,
InterruptibleConnection conn) {
if (LOG.isLoggable(INFO)) {
LOG.info("Incoming connection registered: " + t);
}
registerConnection(c, t, conn, true);
}
@Override
public void registerOutgoingConnection(ContactId c, TransportId t,
InterruptibleConnection conn, Priority priority) {
if (LOG.isLoggable(INFO)) {
LOG.info("Outgoing connection registered: " + t);
}
registerConnection(c, t, conn, false);
setPriority(c, t, conn, priority);
}
private void registerConnection(ContactId c, TransportId t,
InterruptibleConnection conn, boolean incoming) { InterruptibleConnection conn, boolean incoming) {
if (LOG.isLoggable(INFO)) { if (LOG.isLoggable(INFO)) {
if (incoming) LOG.info("Incoming connection registered: " + t); if (incoming) LOG.info("Incoming connection registered: " + t);
@@ -214,7 +232,8 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
@Override @Override
public boolean isConnected(ContactId c) { public boolean isConnected(ContactId c) {
synchronized (lock) { synchronized (lock) {
return contactConnections.containsKey(c); List<ConnectionRecord> recs = contactConnections.get(c);
return recs != null && !recs.isEmpty();
} }
} }

View File

@@ -47,20 +47,24 @@ abstract class DuplexSyncConnection extends SyncConnection
@Override @Override
public void interruptOutgoingSession() { public void interruptOutgoingSession() {
SyncSession out = null;
synchronized (interruptLock) { synchronized (interruptLock) {
if (outgoingSession == null) interruptWaiting = true; if (outgoingSession == null) interruptWaiting = true;
else outgoingSession.interrupt(); else out = outgoingSession;
} }
if (out != null) out.interrupt();
} }
void setOutgoingSession(SyncSession outgoingSession) { void setOutgoingSession(SyncSession outgoingSession) {
boolean interruptWasWaiting = false;
synchronized (interruptLock) { synchronized (interruptLock) {
this.outgoingSession = outgoingSession; this.outgoingSession = outgoingSession;
if (interruptWaiting) { if (interruptWaiting) {
outgoingSession.interrupt(); interruptWasWaiting = true;
interruptWaiting = false; interruptWaiting = false;
} }
} }
if (interruptWasWaiting) outgoingSession.interrupt();
} }
DuplexSyncConnection(KeyManager keyManager, DuplexSyncConnection(KeyManager keyManager,
@@ -99,6 +103,7 @@ abstract class DuplexSyncConnection extends SyncConnection
w.getOutputStream(), ctx); w.getOutputStream(), ctx);
ContactId c = requireNonNull(ctx.getContactId()); ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createDuplexOutgoingSession(c, return syncSessionFactory.createDuplexOutgoingSession(c,
w.getMaxLatency(), w.getMaxIdleTime(), streamWriter, priority); ctx.getTransportId(), w.getMaxLatency(), w.getMaxIdleTime(),
streamWriter, priority);
} }
} }

View File

@@ -59,8 +59,8 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
onReadError(true); onReadError(true);
return; return;
} }
connectionRegistry.registerConnection(contactId, transportId, connectionRegistry.registerIncomingConnection(contactId, transportId,
this, true); this);
// Start the outgoing session on another thread // Start the outgoing session on another thread
ioExecutor.execute(() -> runOutgoingSession(contactId)); ioExecutor.execute(() -> runOutgoingSession(contactId));
try { try {

View File

@@ -104,9 +104,8 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
onReadError(); onReadError();
return; return;
} }
connectionRegistry.registerConnection(contactId, transportId, connectionRegistry.registerOutgoingConnection(contactId, transportId,
this, false); this, priority);
connectionRegistry.setPriority(contactId, transportId, this, priority);
try { try {
// Store any transport properties discovered from the connection // Store any transport properties discovered from the connection
transportPropertyManager.addRemotePropertiesFromConnection( transportPropertyManager.addRemotePropertiesFromConnection(

View File

@@ -72,7 +72,7 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
w.getOutputStream(), ctx); w.getOutputStream(), ctx);
ContactId c = requireNonNull(ctx.getContactId()); ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createSimplexOutgoingSession(c, return syncSessionFactory.createSimplexOutgoingSession(c,
w.getMaxLatency(), streamWriter); ctx.getTransportId(), w.getMaxLatency(), streamWriter);
} }
} }

View File

@@ -3,9 +3,16 @@ package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import static java.util.concurrent.TimeUnit.SECONDS;
@NotNullByDefault @NotNullByDefault
interface BluetoothConnectionLimiter { interface BluetoothConnectionLimiter {
/**
* How long a connection must remain open before it's considered stable.
*/
long STABILITY_PERIOD_MS = SECONDS.toMillis(90);
/** /**
* Informs the limiter that key agreement has started. * Informs the limiter that key agreement has started.
*/ */
@@ -23,17 +30,9 @@ interface BluetoothConnectionLimiter {
boolean canOpenContactConnection(); boolean canOpenContactConnection();
/** /**
* Informs the limiter that a contact connection has been opened. The * Informs the limiter that the given connection has been opened.
* limiter may close the new connection if key agreement is in progress.
* <p/>
* Returns false if the limiter has closed the new connection.
*/ */
boolean contactConnectionOpened(DuplexTransportConnection conn); void connectionOpened(DuplexTransportConnection conn);
/**
* Informs the limiter that a key agreement connection has been opened.
*/
void keyAgreementConnectionOpened(DuplexTransportConnection conn);
/** /**
* Informs the limiter that the given connection has been closed. * Informs the limiter that the given connection has been closed.

View File

@@ -1,46 +1,53 @@
package org.briarproject.bramble.plugin.bluetooth; package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.system.Clock;
import java.io.IOException; import java.util.Iterator;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.util.LogUtils.logException; import static org.briarproject.bramble.api.plugin.BluetoothConstants.ID;
@NotNullByDefault @NotNullByDefault
@ThreadSafe @ThreadSafe
class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter { class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(BluetoothConnectionLimiterImpl.class.getName()); getLogger(BluetoothConnectionLimiterImpl.class.getName());
private final EventBus eventBus;
private final Clock clock;
private final Object lock = new Object(); private final Object lock = new Object();
// The following are locking: lock @GuardedBy("lock")
private final LinkedList<DuplexTransportConnection> connections = private final List<ConnectionRecord> connections = new LinkedList<>();
new LinkedList<>(); @GuardedBy("lock")
private boolean keyAgreementInProgress = false; private boolean keyAgreementInProgress = false;
@GuardedBy("lock")
private int connectionLimit = 2;
BluetoothConnectionLimiterImpl(EventBus eventBus, Clock clock) {
this.eventBus = eventBus;
this.clock = clock;
}
@Override @Override
public void keyAgreementStarted() { public void keyAgreementStarted() {
List<DuplexTransportConnection> close;
synchronized (lock) { synchronized (lock) {
keyAgreementInProgress = true; keyAgreementInProgress = true;
close = new ArrayList<>(connections);
connections.clear();
} }
if (LOG.isLoggable(INFO)) { LOG.info("Key agreement started");
LOG.info("Key agreement started, closing " + close.size() + eventBus.broadcast(new CloseSyncConnectionsEvent(ID));
" connections");
}
for (DuplexTransportConnection conn : close) tryToClose(conn);
} }
@Override @Override
@@ -57,60 +64,81 @@ class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
if (keyAgreementInProgress) { if (keyAgreementInProgress) {
LOG.info("Can't open contact connection during key agreement"); LOG.info("Can't open contact connection during key agreement");
return false; return false;
} else { }
long now = clock.currentTimeMillis();
countStableConnections(now);
if (connections.size() < connectionLimit) {
LOG.info("Can open contact connection"); LOG.info("Can open contact connection");
return true; return true;
}
}
}
@Override
public boolean contactConnectionOpened(DuplexTransportConnection conn) {
boolean accept = true;
synchronized (lock) {
if (keyAgreementInProgress) {
LOG.info("Refusing contact connection during key agreement");
accept = false;
} else { } else {
LOG.info("Accepting contact connection"); LOG.info("Can't open contact connection due to limit");
connections.add(conn); return false;
} }
} }
if (!accept) tryToClose(conn);
return accept;
} }
@Override @Override
public void keyAgreementConnectionOpened(DuplexTransportConnection conn) { public void connectionOpened(DuplexTransportConnection conn) {
synchronized (lock) { synchronized (lock) {
LOG.info("Accepting key agreement connection"); long now = clock.currentTimeMillis();
connections.add(conn); countStableConnections(now);
} connections.add(new ConnectionRecord(conn, now));
} if (LOG.isLoggable(INFO)) {
LOG.info("Connection opened, " + connections.size() + " open");
private void tryToClose(DuplexTransportConnection conn) { }
try {
conn.getWriter().dispose(false);
conn.getReader().dispose(false, false);
} catch (IOException e) {
logException(LOG, WARNING, e);
} }
} }
@Override @Override
public void connectionClosed(DuplexTransportConnection conn) { public void connectionClosed(DuplexTransportConnection conn) {
synchronized (lock) { synchronized (lock) {
connections.remove(conn); countStableConnections(clock.currentTimeMillis());
if (LOG.isLoggable(INFO)) Iterator<ConnectionRecord> it = connections.iterator();
while (it.hasNext()) {
if (it.next().conn == conn) {
it.remove();
break;
}
}
if (LOG.isLoggable(INFO)) {
LOG.info("Connection closed, " + connections.size() + " open"); LOG.info("Connection closed, " + connections.size() + " open");
}
} }
} }
@Override @Override
public void allConnectionsClosed() { public void allConnectionsClosed() {
synchronized (lock) { synchronized (lock) {
long now = clock.currentTimeMillis();
countStableConnections(now);
connections.clear(); connections.clear();
LOG.info("All connections closed"); LOG.info("All connections closed");
} }
} }
@GuardedBy("lock")
private void countStableConnections(long now) {
int stable = 0;
for (ConnectionRecord rec : connections) {
if (now - rec.timeOpened >= STABILITY_PERIOD_MS) stable++;
}
if (stable > connectionLimit) {
connectionLimit = stable;
if (LOG.isLoggable(INFO)) {
LOG.info("Raising connection limit to " + connectionLimit);
}
}
}
private static class ConnectionRecord {
private final DuplexTransportConnection conn;
private final long timeOpened;
private ConnectionRecord(DuplexTransportConnection conn,
long timeOpened) {
this.conn = conn;
this.timeOpened = timeOpened;
}
}
} }

View File

@@ -232,10 +232,9 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
return; return;
} }
LOG.info("Connection received"); LOG.info("Connection received");
if (connectionLimiter.contactConnectionOpened(conn)) { connectionLimiter.connectionOpened(conn);
backoff.reset(); backoff.reset();
callback.handleConnection(conn); callback.handleConnection(conn);
}
if (!running) return; if (!running) return;
} }
} }
@@ -327,8 +326,8 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
String uuid = p.get(PROP_UUID); String uuid = p.get(PROP_UUID);
if (isNullOrEmpty(uuid)) return null; if (isNullOrEmpty(uuid)) return null;
DuplexTransportConnection conn = connect(address, uuid); DuplexTransportConnection conn = connect(address, uuid);
if (conn == null) return null; if (conn != null) connectionLimiter.connectionOpened(conn);
return connectionLimiter.contactConnectionOpened(conn) ? conn : null; return conn;
} }
@Override @Override
@@ -384,7 +383,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
LOG.info("Connecting to key agreement UUID " + uuid); LOG.info("Connecting to key agreement UUID " + uuid);
conn = connect(address, uuid); conn = connect(address, uuid);
} }
if (conn != null) connectionLimiter.keyAgreementConnectionOpened(conn); if (conn != null) connectionLimiter.connectionOpened(conn);
return conn; return conn;
} }
@@ -453,7 +452,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
public KeyAgreementConnection accept() throws IOException { public KeyAgreementConnection accept() throws IOException {
DuplexTransportConnection conn = acceptConnection(ss); DuplexTransportConnection conn = acceptConnection(ss);
if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection"); if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection");
connectionLimiter.keyAgreementConnectionOpened(conn); connectionLimiter.connectionOpened(conn);
return new KeyAgreementConnection(conn, ID); return new KeyAgreementConnection(conn, ID);
} }

View File

@@ -11,6 +11,7 @@ import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Offer;
@@ -19,6 +20,7 @@ import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent; import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent; import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent; import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
@@ -73,6 +75,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final EventBus eventBus; private final EventBus eventBus;
private final Clock clock; private final Clock clock;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency, maxIdleTime; private final int maxLatency, maxIdleTime;
private final StreamWriter streamWriter; private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter; private final SyncRecordWriter recordWriter;
@@ -90,14 +93,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency, EventBus eventBus, Clock clock, ContactId contactId,
int maxIdleTime, StreamWriter streamWriter, TransportId transportId, int maxLatency, int maxIdleTime,
SyncRecordWriter recordWriter, @Nullable Priority priority) { StreamWriter streamWriter, SyncRecordWriter recordWriter,
@Nullable Priority priority) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.clock = clock; this.clock = clock;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime; this.maxIdleTime = maxIdleTime;
this.streamWriter = streamWriter; this.streamWriter = streamWriter;
@@ -230,6 +235,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof LifecycleEvent) { } else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e; LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt(); if (l.getLifecycleState() == STOPPING) interrupt();
} else if (e instanceof CloseSyncConnectionsEvent) {
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
if (c.getTransportId().equals(transportId)) interrupt();
} }
} }

View File

@@ -11,11 +11,13 @@ import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException; import java.io.IOException;
@@ -56,6 +58,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private final Executor dbExecutor; private final Executor dbExecutor;
private final EventBus eventBus; private final EventBus eventBus;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency; private final int maxLatency;
private final StreamWriter streamWriter; private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter; private final SyncRecordWriter recordWriter;
@@ -65,12 +68,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId, int maxLatency, EventBus eventBus, ContactId contactId, TransportId transportId,
StreamWriter streamWriter, SyncRecordWriter recordWriter) { int maxLatency, StreamWriter streamWriter,
SyncRecordWriter recordWriter) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.streamWriter = streamWriter; this.streamWriter = streamWriter;
this.recordWriter = recordWriter; this.recordWriter = recordWriter;
@@ -123,6 +128,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof LifecycleEvent) { } else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e; LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt(); if (l.getLifecycleState() == STOPPING) interrupt();
} else if (e instanceof CloseSyncConnectionsEvent) {
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
if (c.getTransportId().equals(transportId)) interrupt();
} }
} }

View File

@@ -5,6 +5,7 @@ import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.PriorityHandler;
import org.briarproject.bramble.api.sync.SyncRecordReader; import org.briarproject.bramble.api.sync.SyncRecordReader;
@@ -58,23 +59,23 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
} }
@Override @Override
public SyncSession createSimplexOutgoingSession(ContactId c, public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, StreamWriter streamWriter) { int maxLatency, StreamWriter streamWriter) {
OutputStream out = streamWriter.getOutputStream(); OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter = SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out); recordWriterFactory.createRecordWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
maxLatency, streamWriter, recordWriter); maxLatency, streamWriter, recordWriter);
} }
@Override @Override
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
int maxIdleTime, StreamWriter streamWriter, int maxLatency, int maxIdleTime, StreamWriter streamWriter,
@Nullable Priority priority) { @Nullable Priority priority) {
OutputStream out = streamWriter.getOutputStream(); OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter = SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out); recordWriterFactory.createRecordWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t,
maxLatency, maxIdleTime, streamWriter, recordWriter, priority); maxLatency, maxIdleTime, streamWriter, recordWriter, priority);
} }
} }

View File

@@ -74,6 +74,10 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId2)); assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId2));
assertEquals(emptyList(), c.getConnectedContacts(transportId3)); assertEquals(emptyList(), c.getConnectedContacts(transportId3));
assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId3)); assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId3));
assertFalse(c.isConnected(contactId1));
assertFalse(c.isConnected(contactId1, transportId1));
assertFalse(c.isConnected(contactId1, transportId2));
assertFalse(c.isConnected(contactId1, transportId3));
// Check that a registered connection shows up - this should // Check that a registered connection shows up - this should
// broadcast a ConnectionOpenedEvent and a ContactConnectedEvent // broadcast a ConnectionOpenedEvent and a ContactConnectedEvent
@@ -81,13 +85,15 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerIncomingConnection(contactId1, transportId1, conn1);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
c.getConnectedContacts(transportId1)); c.getConnectedContacts(transportId1));
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
c.getConnectedOrBetterContacts(transportId1)); c.getConnectedOrBetterContacts(transportId1));
assertTrue(c.isConnected(contactId1));
assertTrue(c.isConnected(contactId1, transportId1));
// Register another connection with the same contact and transport - // Register another connection with the same contact and transport -
// this should broadcast a ConnectionOpenedEvent and lookup should be // this should broadcast a ConnectionOpenedEvent and lookup should be
@@ -95,13 +101,15 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn2, true); c.registerIncomingConnection(contactId1, transportId1, conn2);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
c.getConnectedContacts(transportId1)); c.getConnectedContacts(transportId1));
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
c.getConnectedOrBetterContacts(transportId1)); c.getConnectedOrBetterContacts(transportId1));
assertTrue(c.isConnected(contactId1));
assertTrue(c.isConnected(contactId1, transportId1));
// Unregister one of the connections - this should broadcast a // Unregister one of the connections - this should broadcast a
// ConnectionClosedEvent and lookup should be unaffected // ConnectionClosedEvent and lookup should be unaffected
@@ -115,6 +123,8 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
c.getConnectedContacts(transportId1)); c.getConnectedContacts(transportId1));
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
c.getConnectedOrBetterContacts(transportId1)); c.getConnectedOrBetterContacts(transportId1));
assertTrue(c.isConnected(contactId1));
assertTrue(c.isConnected(contactId1, transportId1));
// Unregister the other connection - this should broadcast a // Unregister the other connection - this should broadcast a
// ConnectionClosedEvent and a ContactDisconnectedEvent // ConnectionClosedEvent and a ContactDisconnectedEvent
@@ -128,6 +138,8 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
assertEquals(emptyList(), c.getConnectedContacts(transportId1)); assertEquals(emptyList(), c.getConnectedContacts(transportId1));
assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId1)); assertEquals(emptyList(), c.getConnectedOrBetterContacts(transportId1));
assertFalse(c.isConnected(contactId1));
assertFalse(c.isConnected(contactId1, transportId1));
// Try to unregister the connection again - exception should be thrown // Try to unregister the connection again - exception should be thrown
try { try {
@@ -158,11 +170,20 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
exactly(2).of(eventBus).broadcast(with(any( exactly(2).of(eventBus).broadcast(with(any(
ContactConnectedEvent.class))); ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerIncomingConnection(contactId1, transportId1, conn1);
c.registerConnection(contactId2, transportId1, conn2, true); c.registerIncomingConnection(contactId2, transportId1, conn2);
c.registerConnection(contactId2, transportId2, conn3, true); c.registerIncomingConnection(contactId2, transportId2, conn3);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertTrue(c.isConnected(contactId1));
assertTrue(c.isConnected(contactId2));
assertTrue(c.isConnected(contactId1, transportId1));
assertFalse(c.isConnected(contactId1, transportId2));
assertTrue(c.isConnected(contactId2, transportId1));
assertTrue(c.isConnected(contactId2, transportId2));
Collection<ContactId> connected = c.getConnectedContacts(transportId1); Collection<ContactId> connected = c.getConnectedContacts(transportId1);
assertEquals(2, connected.size()); assertEquals(2, connected.size());
assertTrue(connected.contains(contactId1)); assertTrue(connected.contains(contactId1));
@@ -191,12 +212,12 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
ConnectionRegistry c = ConnectionRegistry c =
new ConnectionRegistryImpl(eventBus, pluginConfig); new ConnectionRegistryImpl(eventBus, pluginConfig);
// Connect via transport 1 (worse than 2) and set priority to low // Connect via transport 1 (worse than 2) with no priority set
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerIncomingConnection(contactId1, transportId1, conn1);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -213,8 +234,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId2, conn2, true); c.registerOutgoingConnection(contactId1, transportId2, conn2, high);
c.setPriority(contactId1, transportId2, conn2, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -232,8 +252,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId3, conn3, true); c.registerOutgoingConnection(contactId1, transportId3, conn3, high);
c.setPriority(contactId1, transportId3, conn3, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -269,8 +288,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerOutgoingConnection(contactId1, transportId1, conn1, low);
c.setPriority(contactId1, transportId1, conn1, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -291,8 +309,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(conn2).interruptOutgoingSession(); oneOf(conn2).interruptOutgoingSession();
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId2, conn2, true); c.registerOutgoingConnection(contactId1, transportId2, conn2, high);
c.setPriority(contactId1, transportId2, conn2, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -310,8 +327,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId3, conn3, true); c.registerOutgoingConnection(contactId1, transportId3, conn3, low);
c.setPriority(contactId1, transportId3, conn3, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -370,8 +386,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerOutgoingConnection(contactId1, transportId1, conn1, high);
c.setPriority(contactId1, transportId1, conn1, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -389,8 +404,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(conn1).interruptOutgoingSession(); oneOf(conn1).interruptOutgoingSession();
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId2, conn2, true); c.registerOutgoingConnection(contactId1, transportId2, conn2, low);
c.setPriority(contactId1, transportId2, conn2, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -408,7 +422,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId3, conn3, true); c.registerOutgoingConnection(contactId1, transportId3, conn3, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -465,8 +479,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerOutgoingConnection(contactId1, transportId1, conn1, high);
c.setPriority(contactId1, transportId1, conn1, high);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -478,7 +491,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn2, true); c.registerIncomingConnection(contactId1, transportId1, conn2);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -505,8 +518,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(conn3).interruptOutgoingSession(); oneOf(conn3).interruptOutgoingSession();
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn3, true); c.registerOutgoingConnection(contactId1, transportId1, conn3, low);
c.setPriority(contactId1, transportId1, conn3, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -530,8 +542,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class))); oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn1, true); c.registerOutgoingConnection(contactId1, transportId1, conn1, low);
c.setPriority(contactId1, transportId1, conn1, low);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -543,7 +554,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
}}); }});
c.registerConnection(contactId1, transportId1, conn2, true); c.registerIncomingConnection(contactId1, transportId1, conn2);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),

View File

@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.Transaction; import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.GroupId; import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
@@ -23,6 +24,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getContactId;
import static org.briarproject.bramble.test.TestUtils.getMessage; import static org.briarproject.bramble.test.TestUtils.getMessage;
import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.briarproject.bramble.test.TestUtils.getTransportId;
public class SimplexOutgoingSessionTest extends BrambleMockTestCase { public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@@ -36,14 +38,15 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
private final Executor dbExecutor = new ImmediateExecutor(); private final Executor dbExecutor = new ImmediateExecutor();
private final ContactId contactId = getContactId(); private final ContactId contactId = getContactId();
private final TransportId transportId = getTransportId();
private final Message message = getMessage(new GroupId(getRandomId())); private final Message message = getMessage(new GroupId(getRandomId()));
private final MessageId messageId = message.getId(); private final MessageId messageId = message.getId();
@Test @Test
public void testNothingToSend() throws Exception { public void testNothingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
recordWriter); streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false);
Transaction noMsgTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false);
@@ -76,8 +79,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
public void testSomethingToSend() throws Exception { public void testSomethingToSend() throws Exception {
Ack ack = new Ack(singletonList(messageId)); Ack ack = new Ack(singletonList(messageId));
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
recordWriter); streamWriter, recordWriter);
Transaction ackTxn = new Transaction(null, false); Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false);
Transaction msgTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false);

View File

@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory; import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory;
import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory; import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory;
import org.briarproject.bramble.api.reliability.ReliabilityLayerFactory; import org.briarproject.bramble.api.reliability.ReliabilityLayerFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.plugin.bluetooth.JavaBluetoothPluginFactory; import org.briarproject.bramble.plugin.bluetooth.JavaBluetoothPluginFactory;
import org.briarproject.bramble.plugin.modem.ModemPluginFactory; import org.briarproject.bramble.plugin.modem.ModemPluginFactory;
import org.briarproject.bramble.plugin.tcp.LanTcpPluginFactory; import org.briarproject.bramble.plugin.tcp.LanTcpPluginFactory;
@@ -39,10 +40,11 @@ public class DesktopPluginModule extends PluginModule {
PluginConfig getPluginConfig(@IoExecutor Executor ioExecutor, PluginConfig getPluginConfig(@IoExecutor Executor ioExecutor,
SecureRandom random, BackoffFactory backoffFactory, SecureRandom random, BackoffFactory backoffFactory,
ReliabilityLayerFactory reliabilityFactory, ReliabilityLayerFactory reliabilityFactory,
ShutdownManager shutdownManager, EventBus eventBus, ShutdownManager shutdownManager, EventBus eventBus, Clock clock,
TimeoutMonitor timeoutMonitor) { TimeoutMonitor timeoutMonitor) {
DuplexPluginFactory bluetooth = new JavaBluetoothPluginFactory( DuplexPluginFactory bluetooth = new JavaBluetoothPluginFactory(
ioExecutor, random, eventBus, timeoutMonitor, backoffFactory); ioExecutor, random, eventBus, clock, timeoutMonitor,
backoffFactory);
DuplexPluginFactory modem = new ModemPluginFactory(ioExecutor, DuplexPluginFactory modem = new ModemPluginFactory(ioExecutor,
reliabilityFactory); reliabilityFactory);
DuplexPluginFactory lan = new LanTcpPluginFactory(ioExecutor, DuplexPluginFactory lan = new LanTcpPluginFactory(ioExecutor,

View File

@@ -9,6 +9,7 @@ import org.briarproject.bramble.api.plugin.PluginCallback;
import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin; import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin;
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory; import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory;
import org.briarproject.bramble.api.system.Clock;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@@ -30,15 +31,17 @@ public class JavaBluetoothPluginFactory implements DuplexPluginFactory {
private final Executor ioExecutor; private final Executor ioExecutor;
private final SecureRandom secureRandom; private final SecureRandom secureRandom;
private final EventBus eventBus; private final EventBus eventBus;
private final Clock clock;
private final TimeoutMonitor timeoutMonitor; private final TimeoutMonitor timeoutMonitor;
private final BackoffFactory backoffFactory; private final BackoffFactory backoffFactory;
public JavaBluetoothPluginFactory(Executor ioExecutor, public JavaBluetoothPluginFactory(Executor ioExecutor,
SecureRandom secureRandom, EventBus eventBus, SecureRandom secureRandom, EventBus eventBus, Clock clock,
TimeoutMonitor timeoutMonitor, BackoffFactory backoffFactory) { TimeoutMonitor timeoutMonitor, BackoffFactory backoffFactory) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.secureRandom = secureRandom; this.secureRandom = secureRandom;
this.eventBus = eventBus; this.eventBus = eventBus;
this.clock = clock;
this.timeoutMonitor = timeoutMonitor; this.timeoutMonitor = timeoutMonitor;
this.backoffFactory = backoffFactory; this.backoffFactory = backoffFactory;
} }
@@ -56,7 +59,7 @@ public class JavaBluetoothPluginFactory implements DuplexPluginFactory {
@Override @Override
public DuplexPlugin createPlugin(PluginCallback callback) { public DuplexPlugin createPlugin(PluginCallback callback) {
BluetoothConnectionLimiter connectionLimiter = BluetoothConnectionLimiter connectionLimiter =
new BluetoothConnectionLimiterImpl(); new BluetoothConnectionLimiterImpl(eventBus, clock);
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL, Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE); MAX_POLLING_INTERVAL, BACKOFF_BASE);
JavaBluetoothPlugin plugin = new JavaBluetoothPlugin(connectionLimiter, JavaBluetoothPlugin plugin = new JavaBluetoothPlugin(connectionLimiter,

View File

@@ -120,8 +120,8 @@ public class SetupDataTest extends ScreenshotTest {
// TODO add messages // TODO add messages
connectionRegistry.registerConnection(bob.getId(), ID, () -> { connectionRegistry.registerIncomingConnection(bob.getId(), ID, () -> {
}, true); });
} }
} }

View File

@@ -9,7 +9,9 @@ import org.briarproject.bramble.api.db.DatabaseConfig
import org.briarproject.bramble.api.event.EventBus import org.briarproject.bramble.api.event.EventBus
import org.briarproject.bramble.api.lifecycle.IoExecutor import org.briarproject.bramble.api.lifecycle.IoExecutor
import org.briarproject.bramble.api.network.NetworkManager import org.briarproject.bramble.api.network.NetworkManager
import org.briarproject.bramble.api.plugin.* import org.briarproject.bramble.api.plugin.BackoffFactory
import org.briarproject.bramble.api.plugin.PluginConfig
import org.briarproject.bramble.api.plugin.TransportId
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory
import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory
import org.briarproject.bramble.api.system.Clock import org.briarproject.bramble.api.system.Clock
@@ -32,7 +34,6 @@ import org.briarproject.briar.headless.forums.HeadlessForumModule
import org.briarproject.briar.headless.messaging.HeadlessMessagingModule import org.briarproject.briar.headless.messaging.HeadlessMessagingModule
import java.io.File import java.io.File
import java.util.Collections.emptyList import java.util.Collections.emptyList
import java.util.Collections.singletonMap
import java.util.concurrent.Executor import java.util.concurrent.Executor
import javax.inject.Singleton import javax.inject.Singleton
import javax.net.SocketFactory import javax.net.SocketFactory
@@ -88,9 +89,7 @@ internal class HeadlessModule(private val appDir: File) {
override fun getDuplexFactories(): Collection<DuplexPluginFactory> = duplex override fun getDuplexFactories(): Collection<DuplexPluginFactory> = duplex
override fun getSimplexFactories(): Collection<SimplexPluginFactory> = emptyList() override fun getSimplexFactories(): Collection<SimplexPluginFactory> = emptyList()
override fun shouldPoll(): Boolean = true override fun shouldPoll(): Boolean = true
// Prefer LAN to Bluetooth override fun getTransportPreferences(): Map<TransportId, List<TransportId>> = emptyMap()
override fun getTransportPreferences(): Map<TransportId, List<TransportId>> =
singletonMap(BluetoothConstants.ID, listOf(LanTcpConstants.ID))
} }
} }

View File

@@ -5,8 +5,6 @@ import dagger.Module
import dagger.Provides import dagger.Provides
import org.briarproject.bramble.api.FeatureFlags import org.briarproject.bramble.api.FeatureFlags
import org.briarproject.bramble.api.db.DatabaseConfig import org.briarproject.bramble.api.db.DatabaseConfig
import org.briarproject.bramble.api.plugin.BluetoothConstants
import org.briarproject.bramble.api.plugin.LanTcpConstants
import org.briarproject.bramble.api.plugin.PluginConfig import org.briarproject.bramble.api.plugin.PluginConfig
import org.briarproject.bramble.api.plugin.TransportId import org.briarproject.bramble.api.plugin.TransportId
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory
@@ -21,7 +19,6 @@ import org.briarproject.briar.headless.event.HeadlessEventModule
import org.briarproject.briar.headless.forums.HeadlessForumModule import org.briarproject.briar.headless.forums.HeadlessForumModule
import org.briarproject.briar.headless.messaging.HeadlessMessagingModule import org.briarproject.briar.headless.messaging.HeadlessMessagingModule
import java.io.File import java.io.File
import java.util.*
import java.util.Collections.emptyList import java.util.Collections.emptyList
import javax.inject.Singleton import javax.inject.Singleton
@@ -59,9 +56,7 @@ internal class HeadlessTestModule(private val appDir: File) {
override fun getDuplexFactories(): Collection<DuplexPluginFactory> = emptyList() override fun getDuplexFactories(): Collection<DuplexPluginFactory> = emptyList()
override fun getSimplexFactories(): Collection<SimplexPluginFactory> = emptyList() override fun getSimplexFactories(): Collection<SimplexPluginFactory> = emptyList()
override fun shouldPoll(): Boolean = false override fun shouldPoll(): Boolean = false
// Prefer LAN to Bluetooth override fun getTransportPreferences(): Map<TransportId, List<TransportId>> = emptyMap()
override fun getTransportPreferences(): Map<TransportId, List<TransportId>> =
Collections.singletonMap(BluetoothConstants.ID, listOf(LanTcpConstants.ID))
} }
} }