Compare commits

..

7 Commits

Author SHA1 Message Date
akwizgran
80fa7d29b2 Poll for connections in series, with a delay between attempts.
The delay reduces interference between Bluetooth and wifi.
2020-05-08 18:02:31 +01:00
akwizgran
0db14bd9ad Delegate all other methods to wrapped InputStream. 2020-05-08 16:25:55 +01:00
akwizgran
dd049012ce Add javadoc. 2020-05-08 16:25:55 +01:00
akwizgran
c382ce921c Add unit test for TimeoutInputStream. 2020-05-08 16:25:54 +01:00
akwizgran
639dd43388 Only check timeouts when we have some streams to monitor. 2020-05-08 16:25:54 +01:00
akwizgran
99adf37deb Add timeout monitor for Bluetooth connections. 2020-05-08 16:25:54 +01:00
akwizgran
78391c604b Use keepalives to detect dead connections. 2020-05-08 16:25:54 +01:00
27 changed files with 185 additions and 472 deletions

View File

@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -78,11 +79,12 @@ class AndroidBluetoothPlugin extends BluetoothPlugin<BluetoothServerSocket> {
AndroidBluetoothPlugin(BluetoothConnectionLimiter connectionLimiter,
TimeoutMonitor timeoutMonitor, Executor ioExecutor,
SecureRandom secureRandom, AndroidExecutor androidExecutor,
Context appContext, Clock clock, Backoff backoff,
PluginCallback callback, int maxLatency, int maxIdleTime) {
super(connectionLimiter, timeoutMonitor, ioExecutor, secureRandom,
backoff, callback, maxLatency, maxIdleTime);
ScheduledExecutorService scheduler, SecureRandom secureRandom,
AndroidExecutor androidExecutor, Context appContext, Clock clock,
Backoff backoff, PluginCallback callback, int maxLatency,
int maxIdleTime) {
super(connectionLimiter, timeoutMonitor, ioExecutor, scheduler,
secureRandom, backoff, callback, maxLatency, maxIdleTime);
this.androidExecutor = androidExecutor;
this.appContext = appContext;
this.clock = clock;

View File

@@ -16,6 +16,7 @@ import org.briarproject.bramble.api.system.Clock;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.Immutable;
@@ -32,6 +33,7 @@ public class AndroidBluetoothPluginFactory implements DuplexPluginFactory {
private static final double BACKOFF_BASE = 1.2;
private final Executor ioExecutor;
private final ScheduledExecutorService scheduler;
private final AndroidExecutor androidExecutor;
private final Context appContext;
private final SecureRandom secureRandom;
@@ -41,10 +43,12 @@ public class AndroidBluetoothPluginFactory implements DuplexPluginFactory {
private final BackoffFactory backoffFactory;
public AndroidBluetoothPluginFactory(Executor ioExecutor,
AndroidExecutor androidExecutor, Context appContext,
SecureRandom secureRandom, EventBus eventBus, Clock clock,
TimeoutMonitor timeoutMonitor, BackoffFactory backoffFactory) {
ScheduledExecutorService scheduler, AndroidExecutor androidExecutor,
Context appContext, SecureRandom secureRandom, EventBus eventBus,
Clock clock, TimeoutMonitor timeoutMonitor,
BackoffFactory backoffFactory) {
this.ioExecutor = ioExecutor;
this.scheduler = scheduler;
this.androidExecutor = androidExecutor;
this.appContext = appContext;
this.secureRandom = secureRandom;
@@ -67,12 +71,12 @@ public class AndroidBluetoothPluginFactory implements DuplexPluginFactory {
@Override
public DuplexPlugin createPlugin(PluginCallback callback) {
BluetoothConnectionLimiter connectionLimiter =
new BluetoothConnectionLimiterImpl(eventBus, clock);
new BluetoothConnectionLimiterImpl();
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE);
AndroidBluetoothPlugin plugin = new AndroidBluetoothPlugin(
connectionLimiter, timeoutMonitor, ioExecutor, secureRandom,
androidExecutor, appContext, clock, backoff,
connectionLimiter, timeoutMonitor, ioExecutor, scheduler,
secureRandom, androidExecutor, appContext, clock, backoff,
callback, MAX_LATENCY, MAX_IDLE_TIME);
eventBus.addListener(plugin);
return plugin;

View File

@@ -50,7 +50,7 @@ class AndroidBluetoothTransportConnection
try {
socket.close();
} finally {
connectionLimiter.connectionClosed(this, exception);
connectionLimiter.connectionClosed(this);
}
}
}

View File

@@ -2,7 +2,6 @@ package org.briarproject.bramble.api.sync;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.InputStream;
@@ -12,9 +11,9 @@ public interface SyncSessionFactory {
SyncSession createIncomingSession(ContactId c, InputStream in);
SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, StreamWriter streamWriter);
SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency,
StreamWriter streamWriter);
SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, int maxIdleTime, StreamWriter streamWriter);
SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
int maxIdleTime, StreamWriter streamWriter);
}

View File

@@ -1,26 +0,0 @@
package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when all sync connections using a given
* transport should be closed.
*/
@Immutable
@NotNullByDefault
public class CloseSyncConnectionsEvent extends Event {
private final TransportId transportId;
public CloseSyncConnectionsEvent(TransportId transportId) {
this.transportId = transportId;
}
public TransportId getTransportId() {
return transportId;
}
}

View File

@@ -11,6 +11,11 @@ public interface Clock {
*/
long currentTimeMillis();
/**
* @see System#nanoTime()
*/
long nanoTime();
/**
* @see Thread#sleep(long)
*/

View File

@@ -16,6 +16,11 @@ public class ArrayClock implements Clock {
return times[index++];
}
@Override
public long nanoTime() {
return times[index++] * 1_000_000;
}
@Override
public void sleep(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds);

View File

@@ -17,6 +17,11 @@ public class SettableClock implements Clock {
return time.get();
}
@Override
public long nanoTime() {
return time.get() * 1_000_000;
}
@Override
public void sleep(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds);

View File

@@ -13,28 +13,28 @@ class TimeoutInputStream extends InputStream {
private final Clock clock;
private final InputStream in;
private final long timeoutMs;
private final long timeoutNs;
private final CloseListener listener;
private final Object lock = new Object();
@GuardedBy("lock")
private long readStartedMs = -1;
private long readStartedNs = -1;
TimeoutInputStream(Clock clock, InputStream in, long timeoutMs,
TimeoutInputStream(Clock clock, InputStream in, long timeoutNs,
CloseListener listener) {
this.clock = clock;
this.in = in;
this.timeoutMs = timeoutMs;
this.timeoutNs = timeoutNs;
this.listener = listener;
}
@Override
public int read() throws IOException {
synchronized (lock) {
readStartedMs = clock.currentTimeMillis();
readStartedNs = clock.nanoTime();
}
int input = in.read();
synchronized (lock) {
readStartedMs = -1;
readStartedNs = -1;
}
return input;
}
@@ -47,11 +47,11 @@ class TimeoutInputStream extends InputStream {
@Override
public int read(byte[] b, int off, int len) throws IOException {
synchronized (lock) {
readStartedMs = clock.currentTimeMillis();
readStartedNs = clock.nanoTime();
}
int read = in.read(b, off, len);
synchronized (lock) {
readStartedMs = -1;
readStartedNs = -1;
}
return read;
}
@@ -92,8 +92,8 @@ class TimeoutInputStream extends InputStream {
boolean hasTimedOut() {
synchronized (lock) {
return readStartedMs != -1 &&
clock.currentTimeMillis() - readStartedMs > timeoutMs;
return readStartedNs != -1 &&
clock.nanoTime() - readStartedNs > timeoutNs;
}
}

View File

@@ -52,7 +52,7 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
public InputStream createTimeoutInputStream(InputStream in,
long timeoutMs) {
TimeoutInputStream stream = new TimeoutInputStream(clock, in,
timeoutMs, this::removeStream);
timeoutMs * 1_000_000, this::removeStream);
synchronized (lock) {
if (streams.isEmpty()) {
task = scheduler.scheduleWithFixedDelay(this::checkTimeouts,

View File

@@ -130,8 +130,8 @@ class ConnectionManagerImpl implements ConnectionManager {
TransportConnectionWriter w) throws IOException {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), ctx);
return syncSessionFactory.createSimplexOutgoingSession(
requireNonNull(ctx.getContactId()), ctx.getTransportId(),
ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createSimplexOutgoingSession(c,
w.getMaxLatency(), streamWriter);
}
@@ -139,8 +139,8 @@ class ConnectionManagerImpl implements ConnectionManager {
TransportConnectionWriter w) throws IOException {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), ctx);
return syncSessionFactory.createDuplexOutgoingSession(
requireNonNull(ctx.getContactId()), ctx.getTransportId(),
ContactId c = requireNonNull(ctx.getContactId());
return syncSessionFactory.createDuplexOutgoingSession(c,
w.getMaxLatency(), w.getMaxIdleTime(), streamWriter);
}

View File

@@ -3,30 +3,9 @@ package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
@NotNullByDefault
interface BluetoothConnectionLimiter {
/**
* How long a connection must remain open before it's considered stable.
*/
long STABILITY_PERIOD_MS = SECONDS.toMillis(90);
/**
* The minimum interval between attempts to raise the connection limit.
* This is longer than {@link #STABILITY_PERIOD_MS} so we don't start
* another attempt before knowing the outcome of the last one.
*/
long MIN_ATTEMPT_INTERVAL_MS = MINUTES.toMillis(2);
/**
* The maximum interval between attempts to raise the connection limit.
*/
long MAX_ATTEMPT_INTERVAL_MS = DAYS.toMillis(2);
/**
* Informs the limiter that key agreement has started.
*/
@@ -44,12 +23,12 @@ interface BluetoothConnectionLimiter {
boolean canOpenContactConnection();
/**
* Informs the limiter that a contact connection has been opened.
* Informs the limiter that a contact connection has been opened. The
* limiter may close the new connection if key agreement is in progress.
* <p/>
* Returns true if the connection is allowed.
* Returns false if the limiter has closed the new connection.
*/
boolean contactConnectionOpened(DuplexTransportConnection conn,
boolean incoming);
boolean contactConnectionOpened(DuplexTransportConnection conn);
/**
* Informs the limiter that a key agreement connection has been opened.
@@ -58,13 +37,11 @@ interface BluetoothConnectionLimiter {
/**
* Informs the limiter that the given connection has been closed.
*
* @param exception True if the connection was closed due to an exception.
*/
void connectionClosed(DuplexTransportConnection conn, boolean exception);
void connectionClosed(DuplexTransportConnection conn);
/**
* Informs the limiter that the Bluetooth adapter has been disabled.
* Informs the limiter that all connections have been closed.
*/
void bluetoothDisabled();
void allConnectionsClosed();
}

View File

@@ -1,59 +1,46 @@
package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.system.Clock;
import java.util.Iterator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import static java.lang.Math.min;
import static java.util.logging.Level.INFO;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.plugin.BluetoothConstants.ID;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.util.LogUtils.logException;
@NotNullByDefault
@ThreadSafe
class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
private static final Logger LOG =
getLogger(BluetoothConnectionLimiterImpl.class.getName());
private final EventBus eventBus;
private final Clock clock;
Logger.getLogger(BluetoothConnectionLimiterImpl.class.getName());
private final Object lock = new Object();
@GuardedBy("lock")
private final List<ConnectionRecord> connections = new LinkedList<>();
@GuardedBy("lock")
// The following are locking: lock
private final LinkedList<DuplexTransportConnection> connections =
new LinkedList<>();
private boolean keyAgreementInProgress = false;
@GuardedBy("lock")
private int connectionLimit = 1;
@GuardedBy("lock")
private long timeOfLastAttempt = 0,
attemptInterval = MIN_ATTEMPT_INTERVAL_MS;
@Inject
BluetoothConnectionLimiterImpl(EventBus eventBus, Clock clock) {
this.eventBus = eventBus;
this.clock = clock;
}
@Override
public void keyAgreementStarted() {
List<DuplexTransportConnection> close;
synchronized (lock) {
keyAgreementInProgress = true;
close = new ArrayList<>(connections);
connections.clear();
}
LOG.info("Key agreement started");
eventBus.broadcast(new CloseSyncConnectionsEvent(ID));
if (LOG.isLoggable(INFO)) {
LOG.info("Key agreement started, closing " + close.size() +
" connections");
}
for (DuplexTransportConnection conn : close) tryToClose(conn);
}
@Override
@@ -68,128 +55,62 @@ class BluetoothConnectionLimiterImpl implements BluetoothConnectionLimiter {
public boolean canOpenContactConnection() {
synchronized (lock) {
if (keyAgreementInProgress) {
LOG.info("Refusing contact connection during key agreement");
LOG.info("Can't open contact connection during key agreement");
return false;
} else {
long now = clock.currentTimeMillis();
return isContactConnectionAllowedByLimit(now);
LOG.info("Can open contact connection");
return true;
}
}
}
@Override
public boolean contactConnectionOpened(DuplexTransportConnection conn,
boolean incoming) {
public boolean contactConnectionOpened(DuplexTransportConnection conn) {
boolean accept = true;
synchronized (lock) {
if (keyAgreementInProgress) {
LOG.info("Refusing contact connection during key agreement");
return false;
accept = false;
} else {
long now = clock.currentTimeMillis();
if (incoming || isContactConnectionAllowedByLimit(now)) {
connections.add(new ConnectionRecord(conn, now));
if (!incoming && connections.size() > connectionLimit) {
LOG.info("Attempting to raise connection limit");
timeOfLastAttempt = now;
}
return true;
} else {
return false;
}
LOG.info("Accepting contact connection");
connections.add(conn);
}
}
if (!accept) tryToClose(conn);
return accept;
}
@Override
public void keyAgreementConnectionOpened(DuplexTransportConnection conn) {
synchronized (lock) {
LOG.info("Accepting key agreement connection");
connections.add(
new ConnectionRecord(conn, clock.currentTimeMillis()));
connections.add(conn);
}
}
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getWriter().dispose(false);
conn.getReader().dispose(false, false);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
}
@Override
public void connectionClosed(DuplexTransportConnection conn,
boolean exception) {
public void connectionClosed(DuplexTransportConnection conn) {
synchronized (lock) {
Iterator<ConnectionRecord> it = connections.iterator();
while (it.hasNext()) {
if (it.next().connection == conn) {
long now = clock.currentTimeMillis();
if (exception) connectionFailed(now);
else considerRaisingConnectionLimit(now);
it.remove();
break;
}
}
connections.remove(conn);
if (LOG.isLoggable(INFO))
LOG.info("Connection closed, " + connections.size() + " open");
}
}
@Override
public void bluetoothDisabled() {
public void allConnectionsClosed() {
synchronized (lock) {
LOG.info("Bluetooth disabled");
considerRaisingConnectionLimit(clock.currentTimeMillis());
connections.clear();
}
}
@GuardedBy("lock")
private boolean isContactConnectionAllowedByLimit(long now) {
considerRaisingConnectionLimit(now);
if (connections.size() > connectionLimit) {
LOG.info("Refusing contact connection, above limit");
return false;
} else if (connections.size() < connectionLimit) {
LOG.info("Allowing contact connection, below limit");
return true;
} else if (now - timeOfLastAttempt >= attemptInterval) {
LOG.info("Allowing contact connection, at limit");
return true;
} else {
LOG.info("Refusing contact connection, at limit");
return false;
}
}
@GuardedBy("lock")
private void considerRaisingConnectionLimit(long now) {
int stable = 0;
for (ConnectionRecord rec : connections) {
if (now - rec.timeOpened >= STABILITY_PERIOD_MS) stable++;
}
if (stable > connectionLimit) {
LOG.info("Raising connection limit");
connectionLimit = stable;
attemptInterval = MIN_ATTEMPT_INTERVAL_MS;
}
if (LOG.isLoggable(INFO)) {
LOG.info(stable + " connections are stable, limit is "
+ connectionLimit);
}
}
@GuardedBy("lock")
private void connectionFailed(long now) {
if (connections.size() > connectionLimit &&
now - timeOfLastAttempt < STABILITY_PERIOD_MS) {
LOG.info("Connection failed above limit, increasing interval");
attemptInterval = min(attemptInterval * 2, MAX_ATTEMPT_INTERVAL_MS);
}
}
private static final class ConnectionRecord {
private final DuplexTransportConnection connection;
private final long timeOpened;
private ConnectionRecord(DuplexTransportConnection connection,
long timeOpened) {
this.connection = connection;
this.timeOpened = timeOpened;
LOG.info("All connections closed");
}
}
}

View File

@@ -31,13 +31,17 @@ import org.briarproject.bramble.api.settings.event.SettingsUpdatedEvent;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.Collection;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
@@ -60,10 +64,18 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
private static final Logger LOG =
getLogger(BluetoothPlugin.class.getName());
/**
* The delay between connection attempts when polling. This reduces
* interference between Bluetooth and wifi.
*/
private static final long CONNECTION_ATTEMPT_INTERVAL_MS =
SECONDS.toMillis(5);
final BluetoothConnectionLimiter connectionLimiter;
final TimeoutMonitor timeoutMonitor;
private final Executor ioExecutor;
private final ScheduledExecutorService scheduler;
private final SecureRandom secureRandom;
private final Backoff backoff;
private final PluginCallback callback;
@@ -108,11 +120,13 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
BluetoothPlugin(BluetoothConnectionLimiter connectionLimiter,
TimeoutMonitor timeoutMonitor, Executor ioExecutor,
SecureRandom secureRandom, Backoff backoff,
PluginCallback callback, int maxLatency, int maxIdleTime) {
ScheduledExecutorService scheduler, SecureRandom secureRandom,
Backoff backoff, PluginCallback callback, int maxLatency,
int maxIdleTime) {
this.connectionLimiter = connectionLimiter;
this.timeoutMonitor = timeoutMonitor;
this.ioExecutor = ioExecutor;
this.scheduler = scheduler;
this.secureRandom = secureRandom;
this.backoff = backoff;
this.callback = callback;
@@ -130,7 +144,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
void onAdapterDisabled() {
LOG.info("Bluetooth disabled");
tryToClose(socket);
connectionLimiter.bluetoothDisabled();
connectionLimiter.allConnectionsClosed();
callback.transportDisabled();
}
@@ -232,25 +246,14 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
return;
}
LOG.info("Connection received");
if (connectionLimiter.contactConnectionOpened(conn, true)) {
if (connectionLimiter.contactConnectionOpened(conn)) {
backoff.reset();
callback.handleConnection(conn);
} else {
tryToClose(conn);
}
if (!running) return;
}
}
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getWriter().dispose(false);
conn.getReader().dispose(false, false);
} catch (IOException e) {
logException(LOG, WARNING, e);
}
}
@Override
public void stop() {
running = false;
@@ -279,21 +282,31 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
properties) {
if (!isRunning() || !shouldAllowContactConnections()) return;
backoff.increment();
for (Pair<TransportProperties, ConnectionHandler> p : properties) {
connect(p.getFirst(), p.getSecond());
LinkedList<Pair<TransportProperties, ConnectionHandler>> connectable =
new LinkedList<>();
for (Pair<TransportProperties, ConnectionHandler> pair : properties) {
TransportProperties p = pair.getFirst();
if (isNullOrEmpty(p.get(PROP_ADDRESS))) continue;
if (isNullOrEmpty(p.get(PROP_UUID))) continue;
connectable.add(pair);
}
if (!connectable.isEmpty()) poll(connectable);
}
private void connect(TransportProperties p, ConnectionHandler h) {
String address = p.get(PROP_ADDRESS);
if (isNullOrEmpty(address)) return;
String uuid = p.get(PROP_UUID);
if (isNullOrEmpty(uuid)) return;
private void poll(LinkedList<Pair<TransportProperties, ConnectionHandler>>
connectable) {
ioExecutor.execute(() -> {
DuplexTransportConnection d = createConnection(p);
if (!isRunning() || !shouldAllowContactConnections()) return;
Pair<TransportProperties, ConnectionHandler> pair =
connectable.removeFirst();
DuplexTransportConnection d = createConnection(pair.getFirst());
if (d != null) {
backoff.reset();
h.handleConnection(d);
pair.getSecond().handleConnection(d);
}
if (!connectable.isEmpty()) {
scheduler.schedule(() -> poll(connectable),
CONNECTION_ATTEMPT_INTERVAL_MS, MILLISECONDS);
}
});
}
@@ -339,12 +352,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
if (isNullOrEmpty(uuid)) return null;
DuplexTransportConnection conn = connect(address, uuid);
if (conn == null) return null;
if (connectionLimiter.contactConnectionOpened(conn, false)) {
return conn;
} else {
tryToClose(conn);
return null;
}
return connectionLimiter.contactConnectionOpened(conn) ? conn : null;
}
@Override

View File

@@ -11,7 +11,6 @@ import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer;
@@ -19,7 +18,6 @@ import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
@@ -73,7 +71,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final EventBus eventBus;
private final Clock clock;
private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency, maxIdleTime;
private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter;
@@ -89,15 +86,14 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, Clock clock, ContactId contactId,
TransportId transportId, int maxLatency, int maxIdleTime,
StreamWriter streamWriter, SyncRecordWriter recordWriter) {
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency,
int maxIdleTime, StreamWriter streamWriter,
SyncRecordWriter recordWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.clock = clock;
this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime;
this.streamWriter = streamWriter;
@@ -227,9 +223,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt();
} else if (e instanceof CloseSyncConnectionsEvent) {
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
if (c.getTransportId().equals(transportId)) interrupt();
}
}

View File

@@ -11,13 +11,11 @@ import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException;
@@ -58,7 +56,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private final Executor dbExecutor;
private final EventBus eventBus;
private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency;
private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter;
@@ -68,14 +65,12 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private volatile boolean interrupted = false;
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId, TransportId transportId,
int maxLatency, StreamWriter streamWriter,
SyncRecordWriter recordWriter) {
EventBus eventBus, ContactId contactId, int maxLatency,
StreamWriter streamWriter, SyncRecordWriter recordWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency;
this.streamWriter = streamWriter;
this.recordWriter = recordWriter;
@@ -128,9 +123,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt();
} else if (e instanceof CloseSyncConnectionsEvent) {
CloseSyncConnectionsEvent c = (CloseSyncConnectionsEvent) e;
if (c.getTransportId().equals(transportId)) interrupt();
}
}

View File

@@ -5,7 +5,6 @@ import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.SyncRecordReader;
import org.briarproject.bramble.api.sync.SyncRecordReaderFactory;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
@@ -54,23 +53,22 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
}
@Override
public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
public SyncSession createSimplexOutgoingSession(ContactId c,
int maxLatency, StreamWriter streamWriter) {
OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c,
maxLatency, streamWriter, recordWriter);
}
@Override
public SyncSession createDuplexOutgoingSession(ContactId c,
TransportId t, int maxLatency, int maxIdleTime,
StreamWriter streamWriter) {
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
int maxIdleTime, StreamWriter streamWriter) {
OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t,
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c,
maxLatency, maxIdleTime, streamWriter, recordWriter);
}
}

View File

@@ -12,6 +12,11 @@ public class SystemClock implements Clock {
return System.currentTimeMillis();
}
@Override
public long nanoTime() {
return System.nanoTime();
}
@Override
public void sleep(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds);

View File

@@ -2420,6 +2420,11 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
return time;
}
@Override
public long nanoTime() {
return time * 1_000_000;
}
@Override
public void sleep(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds);

View File

@@ -36,7 +36,7 @@ public class TimeoutInputStreamTest extends BrambleTestCase {
in = new UnresponsiveInputStream();
listenerCalled = new AtomicBoolean(false);
stream = new TimeoutInputStream(new SettableClock(time), in,
TIMEOUT_MS, stream -> listenerCalled.set(true));
TIMEOUT_MS * 1_000_000, stream -> listenerCalled.set(true));
readReturned = new CountDownLatch(1);
}

View File

@@ -1,182 +0,0 @@
package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.briarproject.bramble.test.SettableClock;
import org.jmock.Expectations;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicLong;
import static org.briarproject.bramble.plugin.bluetooth.BluetoothConnectionLimiter.MIN_ATTEMPT_INTERVAL_MS;
import static org.briarproject.bramble.plugin.bluetooth.BluetoothConnectionLimiter.STABILITY_PERIOD_MS;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class BluetoothConnectionLimiterImplTest extends BrambleMockTestCase {
private final EventBus eventBus = context.mock(EventBus.class);
private final DuplexTransportConnection conn1 =
context.mock(DuplexTransportConnection.class, "conn1");
private final DuplexTransportConnection conn2 =
context.mock(DuplexTransportConnection.class, "conn2");
private final DuplexTransportConnection conn3 =
context.mock(DuplexTransportConnection.class, "conn3");
private final long now = System.currentTimeMillis();
private AtomicLong time;
private BluetoothConnectionLimiter limiter;
@Before
public void setUp() {
time = new AtomicLong(now);
Clock clock = new SettableClock(time);
limiter = new BluetoothConnectionLimiterImpl(eventBus, clock);
}
@Test
public void testLimiterDoesNotAllowContactConnectionsDuringKeyAgreement() {
assertTrue(limiter.canOpenContactConnection());
expectCloseSyncConnectionsEvent();
limiter.keyAgreementStarted();
assertFalse(limiter.canOpenContactConnection());
limiter.keyAgreementEnded();
assertTrue(limiter.canOpenContactConnection());
}
@Test
public void testLimiterAllowsAttemptToRaiseLimitAtStartup() {
// First outgoing connection is allowed - we're below the limit of 1
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn1, false));
// Second outgoing connection is allowed - it's time to try raising
// the limit to 2
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn2, false));
// Third outgoing connection is not allowed - we're above the limit of 1
assertFalse(limiter.canOpenContactConnection());
}
@Test
public void testLimiterAllowsThirdConnectionAfterFirstTwoAreClosed() {
// First outgoing connection is allowed - we're below the limit of 1
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn1, false));
// Second outgoing connection is allowed - it's time to try raising
// the limit to 2
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn2, false));
// Third outgoing connection is not allowed - we're above the limit of 1
assertFalse(limiter.canOpenContactConnection());
// Close the first connection
limiter.connectionClosed(conn1, false);
// Third outgoing connection is not allowed - we're at the limit of 1
assertFalse(limiter.canOpenContactConnection());
// Close the second connection
limiter.connectionClosed(conn2, false);
// Third outgoing connection is allowed - we're below the limit of 1
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn3, false));
}
@Test
public void testLimiterRaisesLimitWhenConnectionsAreStable() {
// First outgoing connection is allowed - we're below the limit of 1
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn1, false));
// Second outgoing connection is allowed - it's time to try raising
// the limit to 2
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn2, false));
// Third outgoing connection is not allowed - we're above the limit of 1
assertFalse(limiter.canOpenContactConnection());
// Time passes
time.set(now + STABILITY_PERIOD_MS);
// Third outgoing connection is still not allowed - first two are now
// stable so limit is raised to 2, but we're already at the new limit
assertFalse(limiter.canOpenContactConnection());
// Time passes
time.set(now + MIN_ATTEMPT_INTERVAL_MS);
// Third outgoing connection is allowed - it's time to try raising
// the limit to 3
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn3, false));
// Fourth outgoing connection is not allowed - we're above the limit
// of 2
assertFalse(limiter.canOpenContactConnection());
}
@Test
public void testLimiterIncreasesIntervalWhenConnectionFailsAboveLimit() {
// First outgoing connection is allowed - we're below the limit of 1
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn1, false));
// Time passes
time.set(now + 1);
// Second outgoing connection is allowed - it's time to try raising
// the limit to 2
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn2, false));
// Time passes - the first connection is stable, the second isn't
time.set(now + STABILITY_PERIOD_MS);
// First connection fails. The second connection isn't stable yet, so
// the limiter considers this a failed attempt and doubles the interval
// between attempts
limiter.connectionClosed(conn1, true);
// Third outgoing connection is not allowed - we're still at the limit
// of 1
assertFalse(limiter.canOpenContactConnection());
// Time passes - nearly time for the second attempt
time.set(now + MIN_ATTEMPT_INTERVAL_MS * 2);
// Third outgoing connection is not allowed - we're still at the limit
// of 1
assertFalse(limiter.canOpenContactConnection());
// Time passes - now it's time for the second attempt
time.set(now + 1 + MIN_ATTEMPT_INTERVAL_MS * 2);
// Third outgoing connection is allowed - it's time to try raising the
// limit to 2 again
assertTrue(limiter.canOpenContactConnection());
assertTrue(limiter.contactConnectionOpened(conn3, false));
}
private void expectCloseSyncConnectionsEvent() {
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(
CloseSyncConnectionsEvent.class)));
}});
}
}

View File

@@ -4,7 +4,6 @@ import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message;
@@ -24,7 +23,6 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.test.TestUtils.getContactId;
import static org.briarproject.bramble.test.TestUtils.getMessage;
import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.briarproject.bramble.test.TestUtils.getTransportId;
public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@@ -38,15 +36,14 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
private final Executor dbExecutor = new ImmediateExecutor();
private final ContactId contactId = getContactId();
private final TransportId transportId = getTransportId();
private final Message message = getMessage(new GroupId(getRandomId()));
private final MessageId messageId = message.getId();
@Test
public void testNothingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
streamWriter, recordWriter);
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter,
recordWriter);
Transaction noAckTxn = new Transaction(null, false);
Transaction noMsgTxn = new Transaction(null, false);
@@ -79,8 +76,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
public void testSomethingToSend() throws Exception {
Ack ack = new Ack(singletonList(messageId));
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
streamWriter, recordWriter);
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter,
recordWriter);
Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false);
Transaction msgTxn = new Transaction(null, false);

View File

@@ -10,7 +10,7 @@ import org.briarproject.bramble.api.plugin.PluginConfig;
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory;
import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory;
import org.briarproject.bramble.api.reliability.ReliabilityLayerFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.Scheduler;
import org.briarproject.bramble.plugin.bluetooth.JavaBluetoothPluginFactory;
import org.briarproject.bramble.plugin.modem.ModemPluginFactory;
import org.briarproject.bramble.plugin.tcp.LanTcpPluginFactory;
@@ -19,6 +19,7 @@ import org.briarproject.bramble.plugin.tcp.WanTcpPluginFactory;
import java.security.SecureRandom;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import dagger.Module;
import dagger.Provides;
@@ -31,12 +32,13 @@ public class DesktopPluginModule extends PluginModule {
@Provides
PluginConfig getPluginConfig(@IoExecutor Executor ioExecutor,
@Scheduler ScheduledExecutorService scheduler,
SecureRandom random, BackoffFactory backoffFactory,
ReliabilityLayerFactory reliabilityFactory,
ShutdownManager shutdownManager, EventBus eventBus, Clock clock,
ShutdownManager shutdownManager, EventBus eventBus,
TimeoutMonitor timeoutMonitor) {
DuplexPluginFactory bluetooth = new JavaBluetoothPluginFactory(
ioExecutor, random, eventBus, clock, timeoutMonitor,
ioExecutor, scheduler, random, eventBus, timeoutMonitor,
backoffFactory);
DuplexPluginFactory modem = new ModemPluginFactory(ioExecutor,
reliabilityFactory);

View File

@@ -10,6 +10,7 @@ import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -36,10 +37,11 @@ class JavaBluetoothPlugin extends BluetoothPlugin<StreamConnectionNotifier> {
JavaBluetoothPlugin(BluetoothConnectionLimiter connectionManager,
TimeoutMonitor timeoutMonitor, Executor ioExecutor,
SecureRandom secureRandom, Backoff backoff,
PluginCallback callback, int maxLatency, int maxIdleTime) {
super(connectionManager, timeoutMonitor, ioExecutor, secureRandom,
backoff, callback, maxLatency, maxIdleTime);
ScheduledExecutorService scheduler, SecureRandom secureRandom,
Backoff backoff, PluginCallback callback, int maxLatency,
int maxIdleTime) {
super(connectionManager, timeoutMonitor, ioExecutor, scheduler,
secureRandom, backoff, callback, maxLatency, maxIdleTime);
}
@Override

View File

@@ -9,10 +9,10 @@ import org.briarproject.bramble.api.plugin.PluginCallback;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin;
import org.briarproject.bramble.api.plugin.duplex.DuplexPluginFactory;
import org.briarproject.bramble.api.system.Clock;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.Immutable;
@@ -29,19 +29,20 @@ public class JavaBluetoothPluginFactory implements DuplexPluginFactory {
private static final double BACKOFF_BASE = 1.2;
private final Executor ioExecutor;
private final ScheduledExecutorService scheduler;
private final SecureRandom secureRandom;
private final EventBus eventBus;
private final Clock clock;
private final TimeoutMonitor timeoutMonitor;
private final BackoffFactory backoffFactory;
public JavaBluetoothPluginFactory(Executor ioExecutor,
SecureRandom secureRandom, EventBus eventBus, Clock clock,
TimeoutMonitor timeoutMonitor, BackoffFactory backoffFactory) {
ScheduledExecutorService scheduler, SecureRandom secureRandom,
EventBus eventBus, TimeoutMonitor timeoutMonitor,
BackoffFactory backoffFactory) {
this.ioExecutor = ioExecutor;
this.scheduler = scheduler;
this.secureRandom = secureRandom;
this.eventBus = eventBus;
this.clock = clock;
this.timeoutMonitor = timeoutMonitor;
this.backoffFactory = backoffFactory;
}
@@ -59,12 +60,12 @@ public class JavaBluetoothPluginFactory implements DuplexPluginFactory {
@Override
public DuplexPlugin createPlugin(PluginCallback callback) {
BluetoothConnectionLimiter connectionLimiter =
new BluetoothConnectionLimiterImpl(eventBus, clock);
new BluetoothConnectionLimiterImpl();
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE);
JavaBluetoothPlugin plugin = new JavaBluetoothPlugin(connectionLimiter,
timeoutMonitor, ioExecutor, secureRandom, backoff, callback,
MAX_LATENCY, MAX_IDLE_TIME);
timeoutMonitor, ioExecutor, scheduler, secureRandom, backoff,
callback, MAX_LATENCY, MAX_IDLE_TIME);
eventBus.addListener(plugin);
return plugin;
}

View File

@@ -45,7 +45,7 @@ class JavaBluetoothTransportConnection
try {
stream.close();
} finally {
connectionLimiter.connectionClosed(this, exception);
connectionLimiter.connectionClosed(this);
}
}
}

View File

@@ -127,8 +127,8 @@ public class AppModule {
TimeoutMonitor timeoutMonitor) {
Context appContext = app.getApplicationContext();
DuplexPluginFactory bluetooth = new AndroidBluetoothPluginFactory(
ioExecutor, androidExecutor, appContext, random, eventBus,
clock, timeoutMonitor, backoffFactory);
ioExecutor, scheduler, androidExecutor, appContext, random,
eventBus, clock, timeoutMonitor, backoffFactory);
DuplexPluginFactory tor = new AndroidTorPluginFactory(ioExecutor,
scheduler, appContext, networkManager, locationUtils, eventBus,
torSocketFactory, backoffFactory, resourceProvider,