Merge branch '1712-simple-connection-limiter' into 'master'

Simple connection limiter that closes connections cleanly

Closes #1712

See merge request briar/briar!1254
This commit is contained in:
Torsten Grote
2020-06-26 11:36:21 +00:00
13 changed files with 108 additions and 85 deletions

View File

@@ -67,7 +67,7 @@ public class AndroidBluetoothPluginFactory implements DuplexPluginFactory {
@Override @Override
public DuplexPlugin createPlugin(PluginCallback callback) { public DuplexPlugin createPlugin(PluginCallback callback) {
BluetoothConnectionLimiter connectionLimiter = BluetoothConnectionLimiter connectionLimiter =
new BluetoothConnectionLimiterImpl(); new BluetoothConnectionLimiterImpl(eventBus);
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL, Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE); MAX_POLLING_INTERVAL, BACKOFF_BASE);
AndroidBluetoothPlugin plugin = new AndroidBluetoothPlugin( AndroidBluetoothPlugin plugin = new AndroidBluetoothPlugin(

View File

@@ -2,6 +2,7 @@ package org.briarproject.bramble.api.sync;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.InputStream; import java.io.InputStream;
@@ -14,10 +15,10 @@ public interface SyncSessionFactory {
SyncSession createIncomingSession(ContactId c, InputStream in, SyncSession createIncomingSession(ContactId c, InputStream in,
PriorityHandler handler); PriorityHandler handler);
SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency, SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
StreamWriter streamWriter); int maxLatency, StreamWriter streamWriter);
SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
int maxIdleTime, StreamWriter streamWriter, int maxLatency, int maxIdleTime, StreamWriter streamWriter,
@Nullable Priority priority); @Nullable Priority priority);
} }

View File

@@ -0,0 +1,26 @@
package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when all sync connections using a given
* transport should be closed.
*/
@Immutable
@NotNullByDefault
public class CloseSyncConnectionsEvent extends Event {
private final TransportId transportId;
public CloseSyncConnectionsEvent(TransportId transportId) {
this.transportId = transportId;
}
public TransportId getTransportId() {
return transportId;
}
}

View File

@@ -47,20 +47,24 @@ abstract class DuplexSyncConnection extends SyncConnection
@Override @Override
public void interruptOutgoingSession() { public void interruptOutgoingSession() {
SyncSession out = null;
synchronized (interruptLock) { synchronized (interruptLock) {
if (outgoingSession == null) interruptWaiting = true; if (outgoingSession == null) interruptWaiting = true;
else outgoingSession.interrupt(); else out = outgoingSession;
} }
if (out != null) out.interrupt();
} }
void setOutgoingSession(SyncSession outgoingSession) { void setOutgoingSession(SyncSession outgoingSession) {
boolean interruptWasWaiting = false;
synchronized (interruptLock) { synchronized (interruptLock) {
this.outgoingSession = outgoingSession; this.outgoingSession = outgoingSession;
if (interruptWaiting) { if (interruptWaiting) {
outgoingSession.interrupt(); interruptWasWaiting = true;
interruptWaiting = false; interruptWaiting = false;
} }
} }
if (interruptWasWaiting) outgoingSession.interrupt();
} }
DuplexSyncConnection(KeyManager keyManager, DuplexSyncConnection(KeyManager keyManager,
@@ -99,6 +103,7 @@ abstract class DuplexSyncConnection extends SyncConnection
w.getOutputStream(), ctx); w.getOutputStream(), ctx);
ContactId c = requireNonNull(ctx.getContactId()); ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createDuplexOutgoingSession(c, return syncSessionFactory.createDuplexOutgoingSession(c,
w.getMaxLatency(), w.getMaxIdleTime(), streamWriter, priority); ctx.getTransportId(), w.getMaxLatency(), w.getMaxIdleTime(),
streamWriter, priority);
} }
} }

View File

@@ -72,7 +72,7 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
w.getOutputStream(), ctx); w.getOutputStream(), ctx);
ContactId c = requireNonNull(ctx.getContactId()); ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createSimplexOutgoingSession(c, return syncSessionFactory.createSimplexOutgoingSession(c,
w.getMaxLatency(), streamWriter); ctx.getTransportId(), w.getMaxLatency(), streamWriter);
} }
} }

View File

@@ -23,17 +23,9 @@ interface BluetoothConnectionLimiter {
boolean canOpenContactConnection(); boolean canOpenContactConnection();
/** /**
* Informs the limiter that a contact connection has been opened. The * Informs the limiter that the given connection has been opened.
* limiter may close the new connection if key agreement is in progress.
* <p/>
* Returns false if the limiter has closed the new connection.
*/ */
boolean contactConnectionOpened(DuplexTransportConnection conn); void connectionOpened(DuplexTransportConnection conn);
/**
* Informs the limiter that a key agreement connection has been opened.
*/
void keyAgreementConnectionOpened(DuplexTransportConnection conn);
/** /**
* Informs the limiter that the given connection has been closed. * Informs the limiter that the given connection has been closed.

View File

@@ -1,46 +1,48 @@
package org.briarproject.bramble.plugin.bluetooth; package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.util.LogUtils.logException; import static org.briarproject.bramble.api.plugin.BluetoothConstants.ID;
@NotNullByDefault @NotNullByDefault
@ThreadSafe @ThreadSafe
class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter { class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(BluetoothConnectionLimiterImpl.class.getName()); getLogger(BluetoothConnectionLimiterImpl.class.getName());
private final EventBus eventBus;
private final Object lock = new Object(); private final Object lock = new Object();
// The following are locking: lock @GuardedBy("lock")
private final LinkedList<DuplexTransportConnection> connections = private final List<DuplexTransportConnection> connections =
new LinkedList<>(); new LinkedList<>();
@GuardedBy("lock")
private boolean keyAgreementInProgress = false; private boolean keyAgreementInProgress = false;
BluetoothConnectionLimiterImpl(EventBus eventBus) {
this.eventBus = eventBus;
}
@Override @Override
public void keyAgreementStarted() { public void keyAgreementStarted() {
List<DuplexTransportConnection> close;
synchronized (lock) { synchronized (lock) {
keyAgreementInProgress = true; keyAgreementInProgress = true;
close = new ArrayList<>(connections);
connections.clear();
} }
if (LOG.isLoggable(INFO)) { LOG.info("Key agreement started");
LOG.info("Key agreement started, closing " + close.size() + eventBus.broadcast(new CloseSyncConnectionsEvent(ID));
" connections");
}
for (DuplexTransportConnection conn : close) tryToClose(conn);
} }
@Override @Override
@@ -65,35 +67,12 @@ class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
} }
@Override @Override
public boolean contactConnectionOpened(DuplexTransportConnection conn) { public void connectionOpened(DuplexTransportConnection conn) {
boolean accept = true;
synchronized (lock) { synchronized (lock) {
if (keyAgreementInProgress) {
LOG.info("Refusing contact connection during key agreement");
accept = false;
} else {
LOG.info("Accepting contact connection");
connections.add(conn);
}
}
if (!accept) tryToClose(conn);
return accept;
}
@Override
public void keyAgreementConnectionOpened(DuplexTransportConnection conn) {
synchronized (lock) {
LOG.info("Accepting key agreement connection");
connections.add(conn); connections.add(conn);
} if (LOG.isLoggable(INFO)) {
} LOG.info("Connection opened, " + connections.size() + " open");
}
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getWriter().dispose(false);
conn.getReader().dispose(false, false);
} catch (IOException e) {
logException(LOG, WARNING, e);
} }
} }
@@ -101,8 +80,9 @@ class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
public void connectionClosed(DuplexTransportConnection conn) { public void connectionClosed(DuplexTransportConnection conn) {
synchronized (lock) { synchronized (lock) {
connections.remove(conn); connections.remove(conn);
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO)) {
LOG.info("Connection closed, " + connections.size() + " open"); LOG.info("Connection closed, " + connections.size() + " open");
}
} }
} }

View File

@@ -232,10 +232,9 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
return; return;
} }
LOG.info("Connection received"); LOG.info("Connection received");
if (connectionLimiter.contactConnectionOpened(conn)) { connectionLimiter.connectionOpened(conn);
backoff.reset(); backoff.reset();
callback.handleConnection(conn); callback.handleConnection(conn);
}
if (!running) return; if (!running) return;
} }
} }
@@ -327,8 +326,8 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
String uuid = p.get(PROP_UUID); String uuid = p.get(PROP_UUID);
if (isNullOrEmpty(uuid)) return null; if (isNullOrEmpty(uuid)) return null;
DuplexTransportConnection conn = connect(address, uuid); DuplexTransportConnection conn = connect(address, uuid);
if (conn == null) return null; if (conn != null) connectionLimiter.connectionOpened(conn);
return connectionLimiter.contactConnectionOpened(conn) ? conn : null; return conn;
} }
@Override @Override
@@ -384,7 +383,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
LOG.info("Connecting to key agreement UUID " + uuid); LOG.info("Connecting to key agreement UUID " + uuid);
conn = connect(address, uuid); conn = connect(address, uuid);
} }
if (conn != null) connectionLimiter.keyAgreementConnectionOpened(conn); if (conn != null) connectionLimiter.connectionOpened(conn);
return conn; return conn;
} }
@@ -453,7 +452,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
public KeyAgreementConnection accept() throws IOException { public KeyAgreementConnection accept() throws IOException {
DuplexTransportConnection conn = acceptConnection(ss); DuplexTransportConnection conn = acceptConnection(ss);
if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection"); if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection");
connectionLimiter.keyAgreementConnectionOpened(conn); connectionLimiter.connectionOpened(conn);
return new KeyAgreementConnection(conn, ID); return new KeyAgreementConnection(conn, ID);
} }

View File

@@ -11,6 +11,7 @@ import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Offer;
@@ -19,6 +20,7 @@ import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent; import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent; import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent; import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
@@ -73,6 +75,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final EventBus eventBus; private final EventBus eventBus;
private final Clock clock; private final Clock clock;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency, maxIdleTime; private final int maxLatency, maxIdleTime;
private final StreamWriter streamWriter; private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter; private final SyncRecordWriter recordWriter;
@@ -90,14 +93,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency, EventBus eventBus, Clock clock, ContactId contactId,
int maxIdleTime, StreamWriter streamWriter, TransportId transportId, int maxLatency, int maxIdleTime,
SyncRecordWriter recordWriter, @Nullable Priority priority) { StreamWriter streamWriter, SyncRecordWriter recordWriter,
@Nullable Priority priority) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.clock = clock; this.clock = clock;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime; this.maxIdleTime = maxIdleTime;
this.streamWriter = streamWriter; this.streamWriter = streamWriter;
@@ -230,6 +235,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof LifecycleEvent) { } else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e; LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt(); if (l.getLifecycleState() == STOPPING) interrupt();
} else if (e instanceof CloseSyncConnectionsEvent) {
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
if (c.getTransportId().equals(transportId)) interrupt();
} }
} }

View File

@@ -11,11 +11,13 @@ import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException; import java.io.IOException;
@@ -56,6 +58,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private final Executor dbExecutor; private final Executor dbExecutor;
private final EventBus eventBus; private final EventBus eventBus;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency; private final int maxLatency;
private final StreamWriter streamWriter; private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter; private final SyncRecordWriter recordWriter;
@@ -65,12 +68,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId, int maxLatency, EventBus eventBus, ContactId contactId, TransportId transportId,
StreamWriter streamWriter, SyncRecordWriter recordWriter) { int maxLatency, StreamWriter streamWriter,
SyncRecordWriter recordWriter) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.streamWriter = streamWriter; this.streamWriter = streamWriter;
this.recordWriter = recordWriter; this.recordWriter = recordWriter;
@@ -123,6 +128,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof LifecycleEvent) { } else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e; LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt(); if (l.getLifecycleState() == STOPPING) interrupt();
} else if (e instanceof CloseSyncConnectionsEvent) {
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
if (c.getTransportId().equals(transportId)) interrupt();
} }
} }

View File

@@ -5,6 +5,7 @@ import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.PriorityHandler;
import org.briarproject.bramble.api.sync.SyncRecordReader; import org.briarproject.bramble.api.sync.SyncRecordReader;
@@ -58,23 +59,23 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
} }
@Override @Override
public SyncSession createSimplexOutgoingSession(ContactId c, public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, StreamWriter streamWriter) { int maxLatency, StreamWriter streamWriter) {
OutputStream out = streamWriter.getOutputStream(); OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter = SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out); recordWriterFactory.createRecordWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
maxLatency, streamWriter, recordWriter); maxLatency, streamWriter, recordWriter);
} }
@Override @Override
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
int maxIdleTime, StreamWriter streamWriter, int maxLatency, int maxIdleTime, StreamWriter streamWriter,
@Nullable Priority priority) { @Nullable Priority priority) {
OutputStream out = streamWriter.getOutputStream(); OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter = SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out); recordWriterFactory.createRecordWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t,
maxLatency, maxIdleTime, streamWriter, recordWriter, priority); maxLatency, maxIdleTime, streamWriter, recordWriter, priority);
} }
} }

View File

@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.Transaction; import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.GroupId; import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
@@ -23,6 +24,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getContactId;
import static org.briarproject.bramble.test.TestUtils.getMessage; import static org.briarproject.bramble.test.TestUtils.getMessage;
import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.briarproject.bramble.test.TestUtils.getTransportId;
public class SimplexOutgoingSessionTest extends BrambleMockTestCase { public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@@ -36,14 +38,15 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
private final Executor dbExecutor = new ImmediateExecutor(); private final Executor dbExecutor = new ImmediateExecutor();
private final ContactId contactId = getContactId(); private final ContactId contactId = getContactId();
private final TransportId transportId = getTransportId();
private final Message message = getMessage(new GroupId(getRandomId())); private final Message message = getMessage(new GroupId(getRandomId()));
private final MessageId messageId = message.getId(); private final MessageId messageId = message.getId();
@Test @Test
public void testNothingToSend() throws Exception { public void testNothingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
recordWriter); streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false);
Transaction noMsgTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false);
@@ -76,8 +79,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
public void testSomethingToSend() throws Exception { public void testSomethingToSend() throws Exception {
Ack ack = new Ack(singletonList(messageId)); Ack ack = new Ack(singletonList(messageId));
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
recordWriter); streamWriter, recordWriter);
Transaction ackTxn = new Transaction(null, false); Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false);
Transaction msgTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false);

View File

@@ -56,7 +56,7 @@ public class JavaBluetoothPluginFactory implements DuplexPluginFactory {
@Override @Override
public DuplexPlugin createPlugin(PluginCallback callback) { public DuplexPlugin createPlugin(PluginCallback callback) {
BluetoothConnectionLimiter connectionLimiter = BluetoothConnectionLimiter connectionLimiter =
new BluetoothConnectionLimiterImpl(); new BluetoothConnectionLimiterImpl(eventBus);
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL, Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE); MAX_POLLING_INTERVAL, BACKOFF_BASE);
JavaBluetoothPlugin plugin = new JavaBluetoothPlugin(connectionLimiter, JavaBluetoothPlugin plugin = new JavaBluetoothPlugin(connectionLimiter,