Compare commits

...

6 Commits

Author SHA1 Message Date
akwizgran
cbb65ec807 Refactor key agreement connection choosing. 2018-02-27 17:51:25 +00:00
akwizgran
53b38d83e8 Ensure connections are closed when contact is removed. 2018-02-27 17:18:29 +00:00
akwizgran
692db742cf Ensure all key agreement connection tasks are stopped. 2018-02-26 14:05:59 +00:00
akwizgran
5b177c9901 Log Bluetooth bonding changes. 2018-02-26 14:03:45 +00:00
akwizgran
c00bf2b2cd Close old connections to stay within limit. 2018-02-23 12:06:54 +00:00
akwizgran
55150fe02a Limit the number of open Bluetooth connections. 2018-02-23 10:13:22 +00:00
23 changed files with 576 additions and 265 deletions

View File

@@ -21,6 +21,8 @@ import org.briarproject.bramble.util.AndroidUtils;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@@ -37,7 +39,16 @@ import static android.bluetooth.BluetoothAdapter.SCAN_MODE_CONNECTABLE_DISCOVERA
import static android.bluetooth.BluetoothAdapter.SCAN_MODE_NONE; import static android.bluetooth.BluetoothAdapter.SCAN_MODE_NONE;
import static android.bluetooth.BluetoothAdapter.STATE_OFF; import static android.bluetooth.BluetoothAdapter.STATE_OFF;
import static android.bluetooth.BluetoothAdapter.STATE_ON; import static android.bluetooth.BluetoothAdapter.STATE_ON;
import static android.bluetooth.BluetoothDevice.ACTION_BOND_STATE_CHANGED;
import static android.bluetooth.BluetoothDevice.BOND_BONDED;
import static android.bluetooth.BluetoothDevice.BOND_BONDING;
import static android.bluetooth.BluetoothDevice.BOND_NONE;
import static android.bluetooth.BluetoothDevice.EXTRA_BOND_STATE;
import static android.bluetooth.BluetoothDevice.EXTRA_DEVICE;
import static android.bluetooth.BluetoothDevice.EXTRA_PREVIOUS_BOND_STATE;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.util.PrivacyUtils.scrubMacAddress;
@MethodsNotNullByDefault @MethodsNotNullByDefault
@ParametersNotNullByDefault @ParametersNotNullByDefault
@@ -55,10 +66,12 @@ class AndroidBluetoothPlugin extends BluetoothPlugin<BluetoothServerSocket> {
// Non-null if the plugin started successfully // Non-null if the plugin started successfully
private volatile BluetoothAdapter adapter = null; private volatile BluetoothAdapter adapter = null;
AndroidBluetoothPlugin(Executor ioExecutor, AndroidExecutor androidExecutor, AndroidBluetoothPlugin(BluetoothConnectionManager connectionManager,
Executor ioExecutor, AndroidExecutor androidExecutor,
Context appContext, SecureRandom secureRandom, Backoff backoff, Context appContext, SecureRandom secureRandom, Backoff backoff,
DuplexPluginCallback callback, int maxLatency) { DuplexPluginCallback callback, int maxLatency) {
super(ioExecutor, secureRandom, backoff, callback, maxLatency); super(connectionManager, ioExecutor, secureRandom, backoff, callback,
maxLatency);
this.androidExecutor = androidExecutor; this.androidExecutor = androidExecutor;
this.appContext = appContext; this.appContext = appContext;
} }
@@ -70,6 +83,7 @@ class AndroidBluetoothPlugin extends BluetoothPlugin<BluetoothServerSocket> {
IntentFilter filter = new IntentFilter(); IntentFilter filter = new IntentFilter();
filter.addAction(ACTION_STATE_CHANGED); filter.addAction(ACTION_STATE_CHANGED);
filter.addAction(ACTION_SCAN_MODE_CHANGED); filter.addAction(ACTION_SCAN_MODE_CHANGED);
filter.addAction(ACTION_BOND_STATE_CHANGED);
receiver = new BluetoothStateReceiver(); receiver = new BluetoothStateReceiver();
appContext.registerReceiver(receiver, filter); appContext.registerReceiver(receiver, filter);
} }
@@ -154,7 +168,8 @@ class AndroidBluetoothPlugin extends BluetoothPlugin<BluetoothServerSocket> {
} }
private DuplexTransportConnection wrapSocket(BluetoothSocket s) { private DuplexTransportConnection wrapSocket(BluetoothSocket s) {
return new AndroidBluetoothTransportConnection(this, s); return new AndroidBluetoothTransportConnection(this,
connectionManager, s);
} }
@Override @Override
@@ -165,6 +180,17 @@ class AndroidBluetoothPlugin extends BluetoothPlugin<BluetoothServerSocket> {
@Override @Override
DuplexTransportConnection connectTo(String address, String uuid) DuplexTransportConnection connectTo(String address, String uuid)
throws IOException { throws IOException {
if (LOG.isLoggable(INFO)) {
boolean found = false;
List<String> addresses = new ArrayList<>();
for (BluetoothDevice d : adapter.getBondedDevices()) {
addresses.add(scrubMacAddress(d.getAddress()));
if (d.getAddress().equals(address)) found = true;
}
LOG.info("Bonded devices: " + addresses);
if (found) LOG.info("Connecting to bonded device");
else LOG.info("Connecting to unbonded device");
}
BluetoothDevice d = adapter.getRemoteDevice(address); BluetoothDevice d = adapter.getRemoteDevice(address);
UUID u = UUID.fromString(uuid); UUID u = UUID.fromString(uuid);
BluetoothSocket s = null; BluetoothSocket s = null;
@@ -190,9 +216,12 @@ class AndroidBluetoothPlugin extends BluetoothPlugin<BluetoothServerSocket> {
@Override @Override
public void onReceive(Context ctx, Intent intent) { public void onReceive(Context ctx, Intent intent) {
String action = intent.getAction();
if (ACTION_STATE_CHANGED.equals(action)) {
int state = intent.getIntExtra(EXTRA_STATE, 0); int state = intent.getIntExtra(EXTRA_STATE, 0);
if (state == STATE_ON) onAdapterEnabled(); if (state == STATE_ON) onAdapterEnabled();
else if (state == STATE_OFF) onAdapterDisabled(); else if (state == STATE_OFF) onAdapterDisabled();
} else if (ACTION_SCAN_MODE_CHANGED.equals(action)) {
int scanMode = intent.getIntExtra(EXTRA_SCAN_MODE, 0); int scanMode = intent.getIntExtra(EXTRA_SCAN_MODE, 0);
if (scanMode == SCAN_MODE_NONE) { if (scanMode == SCAN_MODE_NONE) {
LOG.info("Scan mode: None"); LOG.info("Scan mode: None");
@@ -201,6 +230,29 @@ class AndroidBluetoothPlugin extends BluetoothPlugin<BluetoothServerSocket> {
} else if (scanMode == SCAN_MODE_CONNECTABLE_DISCOVERABLE) { } else if (scanMode == SCAN_MODE_CONNECTABLE_DISCOVERABLE) {
LOG.info("Scan mode: Discoverable"); LOG.info("Scan mode: Discoverable");
} }
} else if (ACTION_BOND_STATE_CHANGED.equals(action)) {
BluetoothDevice d = intent.getParcelableExtra(EXTRA_DEVICE);
if (LOG.isLoggable(INFO)) {
LOG.info("Bond state changed for "
+ scrubMacAddress(d.getAddress()));
}
int oldState = intent.getIntExtra(EXTRA_PREVIOUS_BOND_STATE, 0);
if (oldState == BOND_NONE) {
LOG.info("Old state: none");
} else if (oldState == BOND_BONDING) {
LOG.info("Old state: bonding");
} else if (oldState == BOND_BONDED) {
LOG.info("Old state: bonded");
}
int state = intent.getIntExtra(EXTRA_BOND_STATE, 0);
if (state == BOND_NONE) {
LOG.info("New state: none");
} else if (state == BOND_BONDING) {
LOG.info("New state: bonding");
} else if (state == BOND_BONDED) {
LOG.info("New state: bonded");
}
}
} }
} }
} }

View File

@@ -59,11 +59,13 @@ public class AndroidBluetoothPluginFactory implements DuplexPluginFactory {
@Override @Override
public DuplexPlugin createPlugin(DuplexPluginCallback callback) { public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
BluetoothConnectionManager connectionManager =
new BluetoothConnectionManagerImpl();
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL, Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE); MAX_POLLING_INTERVAL, BACKOFF_BASE);
AndroidBluetoothPlugin plugin = new AndroidBluetoothPlugin(ioExecutor, AndroidBluetoothPlugin plugin = new AndroidBluetoothPlugin(
androidExecutor, appContext, secureRandom, backoff, callback, connectionManager, ioExecutor, androidExecutor, appContext,
MAX_LATENCY); secureRandom, backoff, callback, MAX_LATENCY);
eventBus.addListener(plugin); eventBus.addListener(plugin);
return plugin; return plugin;
} }

View File

@@ -14,10 +14,14 @@ import java.io.OutputStream;
class AndroidBluetoothTransportConnection class AndroidBluetoothTransportConnection
extends AbstractDuplexTransportConnection { extends AbstractDuplexTransportConnection {
private final BluetoothConnectionManager connectionManager;
private final BluetoothSocket socket; private final BluetoothSocket socket;
AndroidBluetoothTransportConnection(Plugin plugin, BluetoothSocket socket) { AndroidBluetoothTransportConnection(Plugin plugin,
BluetoothConnectionManager connectionManager,
BluetoothSocket socket) {
super(plugin); super(plugin);
this.connectionManager = connectionManager;
this.socket = socket; this.socket = socket;
} }
@@ -33,6 +37,10 @@ class AndroidBluetoothTransportConnection
@Override @Override
protected void closeConnection(boolean exception) throws IOException { protected void closeConnection(boolean exception) throws IOException {
try {
socket.close(); socket.close();
} finally {
connectionManager.connectionClosed(this);
}
} }
} }

View File

@@ -614,7 +614,7 @@ class TorPlugin implements DuplexPlugin, EventHandler, EventListener {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@@ -2,7 +2,7 @@ package org.briarproject.bramble.api.keyagreement;
import org.briarproject.bramble.api.data.BdfList; import org.briarproject.bramble.api.data.BdfList;
import java.util.concurrent.Callable; import java.io.IOException;
/** /**
* An class for managing a particular key agreement listener. * An class for managing a particular key agreement listener.
@@ -24,11 +24,11 @@ public abstract class KeyAgreementListener {
} }
/** /**
* Starts listening for incoming connections, and returns a Callable that * Blocks until an incoming connection is received and returns it.
* will return a KeyAgreementConnection when an incoming connection is *
* received. * @throws IOException if an error occurs or {@link #close()} is called.
*/ */
public abstract Callable<KeyAgreementConnection> listen(); public abstract KeyAgreementConnection accept() throws IOException;
/** /**
* Closes the underlying server socket. * Closes the underlying server socket.

View File

@@ -36,9 +36,9 @@ public interface DuplexPlugin extends Plugin {
/** /**
* Attempts to connect to the remote peer specified in the given descriptor. * Attempts to connect to the remote peer specified in the given descriptor.
* Returns null if no connection can be established within the given time. * Returns null if no connection can be established.
*/ */
@Nullable @Nullable
DuplexTransportConnection createKeyAgreementConnection( DuplexTransportConnection createKeyAgreementConnection(
byte[] remoteCommitment, BdfList descriptor, long timeout); byte[] remoteCommitment, BdfList descriptor);
} }

View File

@@ -0,0 +1,39 @@
package org.briarproject.bramble.keyagreement;
import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import javax.annotation.Nullable;
interface ConnectionChooser {
/**
* Adds a connection to the set of connections that may be chosen. If
* {@link #stop()} has already been called, the connection will be closed
* immediately.
*/
void addConnection(KeyAgreementConnection c);
/**
* Chooses one of the connections passed to
* {@link #addConnection(KeyAgreementConnection)} and returns it,
* waiting up to the given amount of time for a suitable connection to
* become available. Returns null if the time elapses without a suitable
* connection becoming available.
*
* @param alice true if the local party is Alice
* @param timeout the timeout in milliseconds
* @throws InterruptedException if the thread is interrupted while waiting
* for a suitable connection to become available
*/
@Nullable
KeyAgreementConnection chooseConnection(boolean alice, long timeout)
throws InterruptedException;
/**
* Stops the chooser from accepting new connections. Closes any connections
* already passed to {@link #addConnection(KeyAgreementConnection)}
* and not chosen. Any connections subsequently passed to
* {@link #addConnection(KeyAgreementConnection)} will be closed.
*/
void stop();
}

View File

@@ -0,0 +1,153 @@
package org.briarproject.bramble.keyagreement;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import org.briarproject.bramble.api.keyagreement.event.KeyAgreementWaitingEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.system.Clock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
@NotNullByDefault
@ThreadSafe
class ConnectionChooserImpl implements ConnectionChooser {
private static final Logger LOG =
Logger.getLogger(ConnectionChooserImpl.class.getName());
private final EventBus eventBus;
private final Clock clock;
private final Object lock = new Object();
private final List<KeyAgreementConnection> connections =
new ArrayList<>(); // Locking: lock
private boolean stopped = false; // Locking: lock
@Inject
ConnectionChooserImpl(EventBus eventBus, Clock clock) {
this.eventBus = eventBus;
this.clock = clock;
}
@Override
public void addConnection(KeyAgreementConnection conn) {
boolean close = false;
synchronized (lock) {
if (stopped) {
// Already stopped, close the connection
close = true;
} else {
connections.add(conn);
lock.notifyAll();
}
}
if (close) tryToClose(conn.getConnection());
}
@Nullable
@Override
public KeyAgreementConnection chooseConnection(boolean alice, long timeout)
throws InterruptedException {
if (alice) return chooseConnectionAlice(timeout);
else return chooseConnectionBob(timeout);
}
@Nullable
private KeyAgreementConnection chooseConnectionAlice(long timeout)
throws InterruptedException {
LOG.info("Choosing connection for Alice");
long now = clock.currentTimeMillis();
long end = now + timeout;
KeyAgreementConnection chosen;
synchronized (lock) {
// Wait until we're stopped, a connection is added, or we time out
while (!stopped && connections.isEmpty() && now < end) {
lock.wait(end - now);
now = clock.currentTimeMillis();
}
if (connections.isEmpty()) {
LOG.info("No suitable connection for Alice");
return null;
}
// Choose the first connection
chosen = connections.remove(0);
}
if (LOG.isLoggable(INFO))
LOG.info("Choosing " + chosen.getTransportId());
return chosen;
}
@Nullable
private KeyAgreementConnection chooseConnectionBob(long timeout)
throws InterruptedException {
LOG.info("Choosing connection for Bob");
// Bob waits here for Alice to scan his QR code, determine her role,
// choose a connection and send her key
eventBus.broadcast(new KeyAgreementWaitingEvent());
long now = clock.currentTimeMillis();
long end = now + timeout;
synchronized (lock) {
while (!stopped && now < end) {
// Check whether any connection has data available
Iterator<KeyAgreementConnection> it = connections.iterator();
while (it.hasNext()) {
KeyAgreementConnection c = it.next();
try {
int available = c.getConnection().getReader()
.getInputStream().available();
if (available > 0) {
if (LOG.isLoggable(INFO))
LOG.info("Choosing " + c.getTransportId());
it.remove();
return c;
}
} catch (IOException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
tryToClose(c.getConnection());
it.remove();
}
}
// Wait for 1 second before checking again
lock.wait(Math.min(1000, end - now));
now = clock.currentTimeMillis();
}
}
LOG.info("No suitable connection for Bob");
return null;
}
@Override
public void stop() {
List<KeyAgreementConnection> unused;
synchronized (lock) {
stopped = true;
unused = new ArrayList<>(connections);
connections.clear();
lock.notifyAll();
}
if (LOG.isLoggable(INFO))
LOG.info("Closing " + unused.size() + " unused connections");
for (KeyAgreementConnection c : unused) tryToClose(c.getConnection());
}
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getReader().dispose(false, true);
conn.getWriter().dispose(false);
} catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
}
}

View File

@@ -7,6 +7,7 @@ import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import org.briarproject.bramble.api.keyagreement.KeyAgreementListener; import org.briarproject.bramble.api.keyagreement.KeyAgreementListener;
import org.briarproject.bramble.api.keyagreement.Payload; import org.briarproject.bramble.api.keyagreement.Payload;
import org.briarproject.bramble.api.keyagreement.TransportDescriptor; import org.briarproject.bramble.api.keyagreement.TransportDescriptor;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.Plugin; import org.briarproject.bramble.api.plugin.Plugin;
import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.PluginManager;
@@ -16,20 +17,14 @@ import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.api.system.Clock;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.keyagreement.KeyAgreementConstants.CONNECTION_TIMEOUT; import static org.briarproject.bramble.api.keyagreement.KeyAgreementConstants.CONNECTION_TIMEOUT;
@@ -37,37 +32,31 @@ import static org.briarproject.bramble.api.keyagreement.KeyAgreementConstants.CO
@NotNullByDefault @NotNullByDefault
class KeyAgreementConnector { class KeyAgreementConnector {
interface Callbacks {
void connectionWaiting();
}
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(KeyAgreementConnector.class.getName()); Logger.getLogger(KeyAgreementConnector.class.getName());
private final Callbacks callbacks;
private final Clock clock; private final Clock clock;
private final KeyAgreementCrypto keyAgreementCrypto; private final KeyAgreementCrypto keyAgreementCrypto;
private final PluginManager pluginManager; private final PluginManager pluginManager;
private final CompletionService<KeyAgreementConnection> connect; private final Executor ioExecutor;
private final ConnectionChooser connectionChooser;
private final List<KeyAgreementListener> listeners = new ArrayList<>(); private final List<KeyAgreementListener> listeners =
private final List<Future<KeyAgreementConnection>> pending = new CopyOnWriteArrayList<>();
new ArrayList<>();
private volatile boolean connecting = false; private volatile boolean stopped = false;
private volatile boolean alice = false;
KeyAgreementConnector(Callbacks callbacks, Clock clock, KeyAgreementConnector(Clock clock, KeyAgreementCrypto keyAgreementCrypto,
KeyAgreementCrypto keyAgreementCrypto, PluginManager pluginManager, PluginManager pluginManager, Executor ioExecutor,
Executor ioExecutor) { ConnectionChooser connectionChooser) {
this.callbacks = callbacks;
this.clock = clock; this.clock = clock;
this.keyAgreementCrypto = keyAgreementCrypto; this.keyAgreementCrypto = keyAgreementCrypto;
this.pluginManager = pluginManager; this.pluginManager = pluginManager;
connect = new ExecutorCompletionService<>(ioExecutor); this.ioExecutor = ioExecutor;
this.connectionChooser = connectionChooser;
} }
public Payload listen(KeyPair localKeyPair) { Payload listen(KeyPair localKeyPair) {
LOG.info("Starting BQP listeners"); LOG.info("Starting BQP listeners");
// Derive commitment // Derive commitment
byte[] commitment = keyAgreementCrypto.deriveKeyCommitment( byte[] commitment = keyAgreementCrypto.deriveKeyCommitment(
@@ -80,8 +69,16 @@ class KeyAgreementConnector {
if (l != null) { if (l != null) {
TransportId id = plugin.getId(); TransportId id = plugin.getId();
descriptors.add(new TransportDescriptor(id, l.getDescriptor())); descriptors.add(new TransportDescriptor(id, l.getDescriptor()));
pending.add(connect.submit(new ReadableTask(l.listen()))); if (LOG.isLoggable(INFO)) LOG.info("Listening via " + id);
listeners.add(l); listeners.add(l);
ioExecutor.execute(() -> {
try {
connectionChooser.addConnection(l.accept());
} catch (IOException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
});
} }
} }
return new Payload(commitment, descriptors); return new Payload(commitment, descriptors);
@@ -89,152 +86,71 @@ class KeyAgreementConnector {
void stopListening() { void stopListening() {
LOG.info("Stopping BQP listeners"); LOG.info("Stopping BQP listeners");
for (KeyAgreementListener l : listeners) { stopped = true;
l.close(); for (KeyAgreementListener l : listeners) l.close();
}
listeners.clear(); listeners.clear();
connectionChooser.stop();
} }
@Nullable @Nullable
public KeyAgreementTransport connect(Payload remotePayload, public KeyAgreementTransport connect(Payload remotePayload, boolean alice) {
boolean alice) {
// Let the listeners know if we are Alice
this.connecting = true;
this.alice = alice;
long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT;
// Start connecting over supported transports // Start connecting over supported transports
LOG.info("Starting outgoing BQP connections"); if (LOG.isLoggable(INFO)) {
LOG.info("Starting outgoing BQP connections as "
+ (alice ? "Alice" : "Bob"));
}
for (TransportDescriptor d : remotePayload.getTransportDescriptors()) { for (TransportDescriptor d : remotePayload.getTransportDescriptors()) {
Plugin p = pluginManager.getPlugin(d.getId()); Plugin p = pluginManager.getPlugin(d.getId());
if (p instanceof DuplexPlugin) { if (p instanceof DuplexPlugin) {
if (LOG.isLoggable(INFO))
LOG.info("Connecting via " + d.getId());
DuplexPlugin plugin = (DuplexPlugin) p; DuplexPlugin plugin = (DuplexPlugin) p;
pending.add(connect.submit(new ReadableTask( byte[] commitment = remotePayload.getCommitment();
new ConnectorTask(plugin, remotePayload.getCommitment(), BdfList descriptor = d.getDescriptor();
d.getDescriptor(), end)))); ioExecutor.execute(() -> {
try {
KeyAgreementConnection c =
connect(plugin, commitment, descriptor);
if (c != null) connectionChooser.addConnection(c);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting to connect");
}
});
} }
} }
// Get chosen connection // Get chosen connection
KeyAgreementConnection chosen = null;
try { try {
long now = clock.currentTimeMillis(); KeyAgreementConnection chosen = connectionChooser.chooseConnection(
Future<KeyAgreementConnection> f = alice, CONNECTION_TIMEOUT);
connect.poll(end - now, MILLISECONDS); if (chosen == null) return null; // No suitable connection
if (f == null)
return null; // No task completed within the timeout.
chosen = f.get();
return new KeyAgreementTransport(chosen); return new KeyAgreementTransport(chosen);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Interrupted while waiting for connection"); LOG.info("Interrupted while waiting for connection");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return null; return null;
} catch (ExecutionException | IOException e) { } catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null; return null;
} finally { } finally {
stopListening(); stopListening();
// Close all other connections
closePending(chosen);
} }
} }
private void closePending(@Nullable KeyAgreementConnection chosen) { @Nullable
for (Future<KeyAgreementConnection> f : pending) { @IoExecutor
try { private KeyAgreementConnection connect(DuplexPlugin plugin,
if (f.cancel(true)) { byte[] commitment, BdfList descriptor) throws InterruptedException {
LOG.info("Cancelled task"); // Repeat attempts until we time out, get stopped, or get interrupted
} else if (!f.isCancelled()) { long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT;
KeyAgreementConnection c = f.get(); while (!stopped && clock.currentTimeMillis() < end) {
if (c != null && c != chosen)
tryToClose(c.getConnection(), false);
}
} catch (InterruptedException e) {
LOG.info("Interrupted while closing sockets");
Thread.currentThread().interrupt();
return;
} catch (ExecutionException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
}
}
private void tryToClose(DuplexTransportConnection conn, boolean exception) {
try {
if (LOG.isLoggable(INFO))
LOG.info("Closing connection, exception: " + exception);
conn.getReader().dispose(exception, true);
conn.getWriter().dispose(exception);
} catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
}
private class ConnectorTask implements Callable<KeyAgreementConnection> {
private final byte[] commitment;
private final BdfList descriptor;
private final long end;
private final DuplexPlugin plugin;
private ConnectorTask(DuplexPlugin plugin, byte[] commitment,
BdfList descriptor, long end) {
this.plugin = plugin;
this.commitment = commitment;
this.descriptor = descriptor;
this.end = end;
}
@Override
public KeyAgreementConnection call() throws Exception {
// Repeat attempts until we connect, get interrupted, or time out
while (true) {
long now = clock.currentTimeMillis();
if (now > end) throw new IOException();
DuplexTransportConnection conn = DuplexTransportConnection conn =
plugin.createKeyAgreementConnection(commitment, plugin.createKeyAgreementConnection(commitment, descriptor);
descriptor, end - now); if (conn != null)
if (conn != null) {
if (LOG.isLoggable(INFO))
LOG.info(plugin.getId().getString() +
": Outgoing connection");
return new KeyAgreementConnection(conn, plugin.getId()); return new KeyAgreementConnection(conn, plugin.getId());
}
// Wait 2s before retry (to circumvent transient failures) // Wait 2s before retry (to circumvent transient failures)
Thread.sleep(2000); Thread.sleep(2000);
} }
} return null;
}
private class ReadableTask
implements Callable<KeyAgreementConnection> {
private final Callable<KeyAgreementConnection> connectionTask;
private ReadableTask(Callable<KeyAgreementConnection> connectionTask) {
this.connectionTask = connectionTask;
}
@Override
public KeyAgreementConnection call() throws Exception {
KeyAgreementConnection c = connectionTask.call();
InputStream in = c.getConnection().getReader().getInputStream();
boolean waitingSent = false;
while (!alice && in.available() == 0) {
if (!waitingSent && connecting && !alice) {
// Bob waits here until Alice obtains his payload.
callbacks.connectionWaiting();
waitingSent = true;
}
if (LOG.isLoggable(INFO)) {
LOG.info(c.getTransportId().getString() +
": Waiting for connection");
}
Thread.sleep(1000);
}
if (!alice && LOG.isLoggable(INFO))
LOG.info(c.getTransportId().getString() + ": Data available");
return c;
}
} }
} }

View File

@@ -27,4 +27,10 @@ public class KeyAgreementModule {
PayloadParser providePayloadParser(BdfReaderFactory bdfReaderFactory) { PayloadParser providePayloadParser(BdfReaderFactory bdfReaderFactory) {
return new PayloadParserImpl(bdfReaderFactory); return new PayloadParserImpl(bdfReaderFactory);
} }
@Provides
ConnectionChooser provideConnectionChooser(
ConnectionChooserImpl connectionChooser) {
return connectionChooser;
}
} }

View File

@@ -99,7 +99,8 @@ class KeyAgreementProtocol {
PublicKey theirPublicKey; PublicKey theirPublicKey;
if (alice) { if (alice) {
sendKey(); sendKey();
// Alice waits here until Bob obtains her payload. // Alice waits here for Bob to scan her QR code, determine his
// role, receive her key and respond with his key
callbacks.connectionWaiting(); callbacks.connectionWaiting();
theirPublicKey = receiveKey(); theirPublicKey = receiveKey();
} else { } else {

View File

@@ -32,8 +32,7 @@ import static java.util.logging.Level.WARNING;
@MethodsNotNullByDefault @MethodsNotNullByDefault
@ParametersNotNullByDefault @ParametersNotNullByDefault
class KeyAgreementTaskImpl extends Thread implements class KeyAgreementTaskImpl extends Thread implements
KeyAgreementTask, KeyAgreementConnector.Callbacks, KeyAgreementTask, KeyAgreementProtocol.Callbacks {
KeyAgreementProtocol.Callbacks {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(KeyAgreementTaskImpl.class.getName()); Logger.getLogger(KeyAgreementTaskImpl.class.getName());
@@ -52,14 +51,15 @@ class KeyAgreementTaskImpl extends Thread implements
KeyAgreementTaskImpl(Clock clock, CryptoComponent crypto, KeyAgreementTaskImpl(Clock clock, CryptoComponent crypto,
KeyAgreementCrypto keyAgreementCrypto, EventBus eventBus, KeyAgreementCrypto keyAgreementCrypto, EventBus eventBus,
PayloadEncoder payloadEncoder, PluginManager pluginManager, PayloadEncoder payloadEncoder, PluginManager pluginManager,
@IoExecutor Executor ioExecutor) { @IoExecutor Executor ioExecutor,
ConnectionChooser connectionChooser) {
this.crypto = crypto; this.crypto = crypto;
this.keyAgreementCrypto = keyAgreementCrypto; this.keyAgreementCrypto = keyAgreementCrypto;
this.eventBus = eventBus; this.eventBus = eventBus;
this.payloadEncoder = payloadEncoder; this.payloadEncoder = payloadEncoder;
localKeyPair = crypto.generateAgreementKeyPair(); localKeyPair = crypto.generateAgreementKeyPair();
connector = new KeyAgreementConnector(this, clock, keyAgreementCrypto, connector = new KeyAgreementConnector(clock, keyAgreementCrypto,
pluginManager, ioExecutor); pluginManager, ioExecutor, connectionChooser);
} }
@Override @Override
@@ -73,10 +73,8 @@ class KeyAgreementTaskImpl extends Thread implements
@Override @Override
public synchronized void stopListening() { public synchronized void stopListening() {
if (localPayload != null) { if (localPayload != null) {
if (remotePayload == null) if (remotePayload == null) connector.stopListening();
connector.stopListening(); else interrupt();
else
interrupt();
} }
} }

View File

@@ -247,12 +247,12 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(transportId, tag); ctx = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) { } catch (IOException | DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false); dispose(true, false);
return; return;
} }
if (ctx == null) { if (ctx == null) {
LOG.info("Unrecognised tag"); LOG.info("Unrecognised tag");
disposeReader(false, false); dispose(false, false);
return; return;
} }
contactId = ctx.getContactId(); contactId = ctx.getContactId();
@@ -263,10 +263,10 @@ class ConnectionManagerImpl implements ConnectionManager {
// Create and run the incoming session // Create and run the incoming session
incomingSession = createIncomingSession(ctx, reader); incomingSession = createIncomingSession(ctx, reader);
incomingSession.run(); incomingSession.run();
disposeReader(false, true); dispose(false, true);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true); dispose(true, true);
} finally { } finally {
connectionRegistry.unregisterConnection(contactId, transportId, connectionRegistry.unregisterConnection(contactId, transportId,
true); true);
@@ -280,39 +280,28 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(contactId, transportId); ctx = keyManager.getStreamContext(contactId, transportId);
} catch (DbException e) { } catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true); dispose(true, true);
return; return;
} }
if (ctx == null) { if (ctx == null) {
LOG.warning("Could not allocate stream context"); LOG.warning("Could not allocate stream context");
disposeWriter(true); dispose(true, true);
return; return;
} }
try { try {
// Create and run the outgoing session // Create and run the outgoing session
outgoingSession = createDuplexOutgoingSession(ctx, writer); outgoingSession = createDuplexOutgoingSession(ctx, writer);
outgoingSession.run(); outgoingSession.run();
disposeWriter(false); dispose(false, true);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true); dispose(true, true);
} }
} }
private void disposeReader(boolean exception, boolean recognised) { private void dispose(boolean exception, boolean recognised) {
if (exception && outgoingSession != null)
outgoingSession.interrupt();
try { try {
reader.dispose(exception, recognised); reader.dispose(exception, recognised);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void disposeWriter(boolean exception) {
if (exception && incomingSession != null)
incomingSession.interrupt();
try {
writer.dispose(exception); writer.dispose(exception);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -346,12 +335,12 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(contactId, transportId); ctx = keyManager.getStreamContext(contactId, transportId);
} catch (DbException e) { } catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true); dispose(true);
return; return;
} }
if (ctx == null) { if (ctx == null) {
LOG.warning("Could not allocate stream context"); LOG.warning("Could not allocate stream context");
disposeWriter(true); dispose(true);
return; return;
} }
// Start the incoming session on another thread // Start the incoming session on another thread
@@ -360,10 +349,10 @@ class ConnectionManagerImpl implements ConnectionManager {
// Create and run the outgoing session // Create and run the outgoing session
outgoingSession = createDuplexOutgoingSession(ctx, writer); outgoingSession = createDuplexOutgoingSession(ctx, writer);
outgoingSession.run(); outgoingSession.run();
disposeWriter(false); dispose(false);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true); dispose(true);
} }
} }
@@ -375,19 +364,19 @@ class ConnectionManagerImpl implements ConnectionManager {
ctx = keyManager.getStreamContext(transportId, tag); ctx = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) { } catch (IOException | DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false); dispose(true);
return; return;
} }
// Unrecognised tags are suspicious in this case // Unrecognised tags are suspicious in this case
if (ctx == null) { if (ctx == null) {
LOG.warning("Unrecognised tag for returning stream"); LOG.warning("Unrecognised tag for returning stream");
disposeReader(true, false); dispose(true);
return; return;
} }
// Check that the stream comes from the expected contact // Check that the stream comes from the expected contact
if (!ctx.getContactId().equals(contactId)) { if (!ctx.getContactId().equals(contactId)) {
LOG.warning("Wrong contact ID for returning stream"); LOG.warning("Wrong contact ID for returning stream");
disposeReader(true, true); dispose(true);
return; return;
} }
connectionRegistry.registerConnection(contactId, transportId, connectionRegistry.registerConnection(contactId, transportId,
@@ -396,30 +385,20 @@ class ConnectionManagerImpl implements ConnectionManager {
// Create and run the incoming session // Create and run the incoming session
incomingSession = createIncomingSession(ctx, reader); incomingSession = createIncomingSession(ctx, reader);
incomingSession.run(); incomingSession.run();
disposeReader(false, true); dispose(false);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true); dispose(true);
} finally { } finally {
connectionRegistry.unregisterConnection(contactId, transportId, connectionRegistry.unregisterConnection(contactId, transportId,
false); false);
} }
} }
private void disposeReader(boolean exception, boolean recognised) { private void dispose(boolean exception) {
if (exception && outgoingSession != null)
outgoingSession.interrupt();
try {
reader.dispose(exception, recognised);
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void disposeWriter(boolean exception) {
if (exception && incomingSession != null)
incomingSession.interrupt();
try { try {
// 'Recognised' is always true because we opened the connection
reader.dispose(exception, true);
writer.dispose(exception); writer.dispose(exception);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);

View File

@@ -0,0 +1,36 @@
package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
@NotNullByDefault
interface BluetoothConnectionManager {
/**
* Returns true if a contact connection can be opened without exceeding
* the connection limit. This method does not need to be called for key
* exchange connections.
*/
boolean canOpenConnection();
/**
* Passes a newly opened connection to the manager. The manager may close
* the new connection or another connection to stay within the connection
* limit.
* <p/>
* Returns false if the manager has closed the new connection (this will
* never be the case for key exchange connections).
*/
boolean connectionOpened(DuplexTransportConnection conn,
boolean isForKeyExchange);
/**
* Informs the manager that the given connection has been closed.
*/
void connectionClosed(DuplexTransportConnection conn);
/**
* Informs the manager that all connections have been closed.
*/
void allConnectionsClosed();
}

View File

@@ -0,0 +1,88 @@
package org.briarproject.bramble.plugin.bluetooth;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import java.io.IOException;
import java.util.LinkedList;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
import static java.util.logging.Level.INFO;
@NotNullByDefault
@ThreadSafe
class BluetoothConnectionManagerImpl implements BluetoothConnectionManager {
private static final int MAX_OPEN_CONNECTIONS = 5;
private static final Logger LOG =
Logger.getLogger(BluetoothConnectionManagerImpl.class.getName());
private final Object lock = new Object();
private final LinkedList<DuplexTransportConnection> connections =
new LinkedList<>(); // Locking: lock
@Override
public boolean canOpenConnection() {
synchronized (lock) {
int open = connections.size();
if (LOG.isLoggable(INFO)) LOG.info(open + " open connections");
return open < MAX_OPEN_CONNECTIONS;
}
}
@Override
public boolean connectionOpened(DuplexTransportConnection conn,
boolean isForKeyExchange) {
DuplexTransportConnection close = null;
synchronized (lock) {
int open = connections.size();
boolean accept = isForKeyExchange || open < MAX_OPEN_CONNECTIONS;
if (accept) {
if (LOG.isLoggable(INFO))
LOG.info("Accepting connection, " + (open + 1) + " open");
connections.add(conn);
if (open == MAX_OPEN_CONNECTIONS) {
LOG.info("Closing old connection to stay within limit");
close = connections.poll();
}
} else {
if (LOG.isLoggable(INFO))
LOG.info("Refusing connection, " + open + " open");
close = conn;
}
}
if (close != null) tryToClose(close);
return close != conn;
}
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getReader().dispose(false, true);
conn.getWriter().dispose(false);
} catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.log(INFO, e.toString(), e);
}
}
@Override
public void connectionClosed(DuplexTransportConnection conn) {
synchronized (lock) {
connections.remove(conn);
if (LOG.isLoggable(INFO)) {
int open = connections.size();
LOG.info("Connection closed, " + open + " open");
}
}
}
@Override
public void allConnectionsClosed() {
synchronized (lock) {
connections.clear();
LOG.info("All connections closed");
}
}
}

View File

@@ -28,7 +28,6 @@ import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -52,6 +51,8 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(BluetoothPlugin.class.getName()); Logger.getLogger(BluetoothPlugin.class.getName());
protected final BluetoothConnectionManager connectionManager;
private final Executor ioExecutor; private final Executor ioExecutor;
private final SecureRandom secureRandom; private final SecureRandom secureRandom;
private final Backoff backoff; private final Backoff backoff;
@@ -92,8 +93,10 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
abstract DuplexTransportConnection connectTo(String address, String uuid) abstract DuplexTransportConnection connectTo(String address, String uuid)
throws IOException; throws IOException;
BluetoothPlugin(Executor ioExecutor, SecureRandom secureRandom, BluetoothPlugin(BluetoothConnectionManager connectionManager,
Executor ioExecutor, SecureRandom secureRandom,
Backoff backoff, DuplexPluginCallback callback, int maxLatency) { Backoff backoff, DuplexPluginCallback callback, int maxLatency) {
this.connectionManager = connectionManager;
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.secureRandom = secureRandom; this.secureRandom = secureRandom;
this.backoff = backoff; this.backoff = backoff;
@@ -111,6 +114,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
void onAdapterDisabled() { void onAdapterDisabled() {
LOG.info("Bluetooth disabled"); LOG.info("Bluetooth disabled");
tryToClose(socket); tryToClose(socket);
connectionManager.allConnectionsClosed();
callback.transportDisabled(); callback.transportDisabled();
} }
@@ -214,6 +218,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
return; return;
} }
backoff.reset(); backoff.reset();
if (connectionManager.connectionOpened(conn, false))
callback.incomingConnectionCreated(conn); callback.incomingConnectionCreated(conn);
if (!running) return; if (!running) return;
} }
@@ -258,9 +263,14 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
if (StringUtils.isNullOrEmpty(uuid)) continue; if (StringUtils.isNullOrEmpty(uuid)) continue;
ioExecutor.execute(() -> { ioExecutor.execute(() -> {
if (!isRunning() || !shouldAllowContactConnections()) return; if (!isRunning() || !shouldAllowContactConnections()) return;
if (!connectionManager.canOpenConnection()) {
LOG.info("Not connecting, too many open connections");
return;
}
DuplexTransportConnection conn = connect(address, uuid); DuplexTransportConnection conn = connect(address, uuid);
if (conn != null) { if (conn != null) {
backoff.reset(); backoff.reset();
if (connectionManager.connectionOpened(conn, false))
callback.outgoingConnectionCreated(c, conn); callback.outgoingConnectionCreated(c, conn);
} }
}); });
@@ -301,12 +311,19 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
@Override @Override
public DuplexTransportConnection createConnection(ContactId c) { public DuplexTransportConnection createConnection(ContactId c) {
if (!isRunning() || !shouldAllowContactConnections()) return null; if (!isRunning() || !shouldAllowContactConnections()) return null;
if (!connectionManager.canOpenConnection()) {
LOG.info("Not connecting, too many open connections");
return null;
}
TransportProperties p = callback.getRemoteProperties(c); TransportProperties p = callback.getRemoteProperties(c);
String address = p.get(PROP_ADDRESS); String address = p.get(PROP_ADDRESS);
if (StringUtils.isNullOrEmpty(address)) return null; if (StringUtils.isNullOrEmpty(address)) return null;
String uuid = p.get(PROP_UUID); String uuid = p.get(PROP_UUID);
if (StringUtils.isNullOrEmpty(uuid)) return null; if (StringUtils.isNullOrEmpty(uuid)) return null;
return connect(address, uuid); DuplexTransportConnection conn = connect(address, uuid);
if (conn == null) return null;
// TODO: Why don't we reset the backoff here?
return connectionManager.connectionOpened(conn, false) ? conn : null;
} }
@Override @Override
@@ -343,7 +360,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
if (!isRunning()) return null; if (!isRunning()) return null;
String address; String address;
try { try {
@@ -356,7 +373,10 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
String uuid = UUID.nameUUIDFromBytes(commitment).toString(); String uuid = UUID.nameUUIDFromBytes(commitment).toString();
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO))
LOG.info("Connecting to key agreement UUID " + uuid); LOG.info("Connecting to key agreement UUID " + uuid);
return connect(address, uuid); DuplexTransportConnection conn = connect(address, uuid);
// The connection limit doesn't apply to key agreement
if (conn != null) connectionManager.connectionOpened(conn, true);
return conn;
} }
private String parseAddress(BdfList descriptor) throws FormatException { private String parseAddress(BdfList descriptor) throws FormatException {
@@ -406,13 +426,12 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
} }
@Override @Override
public Callable<KeyAgreementConnection> listen() { public KeyAgreementConnection accept() throws IOException {
return () -> {
DuplexTransportConnection conn = acceptConnection(ss); DuplexTransportConnection conn = acceptConnection(ss);
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection");
LOG.info(ID.getString() + ": Incoming connection"); // The connection limit doesn't apply to key agreement
connectionManager.connectionOpened(conn, true);
return new KeyAgreementConnection(conn, ID); return new KeyAgreementConnection(conn, ID);
};
} }
@Override @Override

View File

@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -224,7 +223,7 @@ class LanTcpPlugin extends TcpPlugin {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
if (!isRunning()) return null; if (!isRunning()) return null;
InetSocketAddress remote; InetSocketAddress remote;
try { try {
@@ -283,14 +282,11 @@ class LanTcpPlugin extends TcpPlugin {
} }
@Override @Override
public Callable<KeyAgreementConnection> listen() { public KeyAgreementConnection accept() throws IOException {
return () -> {
Socket s = ss.accept(); Socket s = ss.accept();
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection");
LOG.info(ID.getString() + ": Incoming connection"); return new KeyAgreementConnection(new TcpTransportConnection(
return new KeyAgreementConnection( LanTcpPlugin.this, s), ID);
new TcpTransportConnection(LanTcpPlugin.this, s), ID);
};
} }
@Override @Override

View File

@@ -297,7 +297,7 @@ abstract class TcpPlugin implements DuplexPlugin {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@@ -2,7 +2,6 @@ package org.briarproject.bramble.plugin.tcp;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.data.BdfList; import org.briarproject.bramble.api.data.BdfList;
import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import org.briarproject.bramble.api.keyagreement.KeyAgreementListener; import org.briarproject.bramble.api.keyagreement.KeyAgreementListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.Backoff; import org.briarproject.bramble.api.plugin.Backoff;
@@ -26,11 +25,9 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
@@ -195,9 +192,16 @@ public class LanTcpPluginTest extends BrambleTestCase {
KeyAgreementListener kal = KeyAgreementListener kal =
plugin.createKeyAgreementListener(new byte[COMMIT_LENGTH]); plugin.createKeyAgreementListener(new byte[COMMIT_LENGTH]);
assertNotNull(kal); assertNotNull(kal);
Callable<KeyAgreementConnection> c = kal.listen(); CountDownLatch latch = new CountDownLatch(1);
FutureTask<KeyAgreementConnection> f = new FutureTask<>(c); AtomicBoolean error = new AtomicBoolean(false);
new Thread(f).start(); new Thread(() -> {
try {
kal.accept();
latch.countDown();
} catch (IOException e) {
error.set(true);
}
}).start();
// The plugin should have bound a socket and stored the port number // The plugin should have bound a socket and stored the port number
BdfList descriptor = kal.getDescriptor(); BdfList descriptor = kal.getDescriptor();
assertEquals(3, descriptor.size()); assertEquals(3, descriptor.size());
@@ -213,10 +217,12 @@ public class LanTcpPluginTest extends BrambleTestCase {
InetSocketAddress socketAddr = new InetSocketAddress(addr, port); InetSocketAddress socketAddr = new InetSocketAddress(addr, port);
Socket s = new Socket(); Socket s = new Socket();
s.connect(socketAddr, 100); s.connect(socketAddr, 100);
assertNotNull(f.get(5, SECONDS)); // Check that the connection was accepted
assertTrue(latch.await(5, SECONDS));
assertFalse(error.get());
// Clean up
s.close(); s.close();
kal.close(); kal.close();
// Stop the plugin
plugin.stop(); plugin.stop();
} }
@@ -262,7 +268,7 @@ public class LanTcpPluginTest extends BrambleTestCase {
descriptor.add(local.getPort()); descriptor.add(local.getPort());
// Connect to the port // Connect to the port
DuplexTransportConnection d = plugin.createKeyAgreementConnection( DuplexTransportConnection d = plugin.createKeyAgreementConnection(
new byte[COMMIT_LENGTH], descriptor, 5000); new byte[COMMIT_LENGTH], descriptor);
assertNotNull(d); assertNotNull(d);
// Check that the connection was accepted // Check that the connection was accepted
assertTrue(latch.await(5, SECONDS)); assertTrue(latch.await(5, SECONDS));

View File

@@ -31,9 +31,11 @@ class JavaBluetoothPlugin extends BluetoothPlugin<StreamConnectionNotifier> {
// Non-null if the plugin started successfully // Non-null if the plugin started successfully
private volatile LocalDevice localDevice = null; private volatile LocalDevice localDevice = null;
JavaBluetoothPlugin(Executor ioExecutor, SecureRandom secureRandom, JavaBluetoothPlugin(BluetoothConnectionManager connectionManager,
Executor ioExecutor, SecureRandom secureRandom,
Backoff backoff, DuplexPluginCallback callback, int maxLatency) { Backoff backoff, DuplexPluginCallback callback, int maxLatency) {
super(ioExecutor, secureRandom, backoff, callback, maxLatency); super(connectionManager, ioExecutor, secureRandom, backoff, callback,
maxLatency);
} }
@Override @Override
@@ -110,6 +112,6 @@ class JavaBluetoothPlugin extends BluetoothPlugin<StreamConnectionNotifier> {
} }
private DuplexTransportConnection wrapSocket(StreamConnection s) { private DuplexTransportConnection wrapSocket(StreamConnection s) {
return new JavaBluetoothTransportConnection(this, s); return new JavaBluetoothTransportConnection(this, connectionManager, s);
} }
} }

View File

@@ -51,10 +51,12 @@ public class JavaBluetoothPluginFactory implements DuplexPluginFactory {
@Override @Override
public DuplexPlugin createPlugin(DuplexPluginCallback callback) { public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
BluetoothConnectionManager connectionManager =
new BluetoothConnectionManagerImpl();
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL, Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE); MAX_POLLING_INTERVAL, BACKOFF_BASE);
JavaBluetoothPlugin plugin = new JavaBluetoothPlugin(ioExecutor, JavaBluetoothPlugin plugin = new JavaBluetoothPlugin(connectionManager,
secureRandom, backoff, callback, MAX_LATENCY); ioExecutor, secureRandom, backoff, callback, MAX_LATENCY);
eventBus.addListener(plugin); eventBus.addListener(plugin);
return plugin; return plugin;
} }

View File

@@ -14,11 +14,15 @@ import javax.microedition.io.StreamConnection;
class JavaBluetoothTransportConnection class JavaBluetoothTransportConnection
extends AbstractDuplexTransportConnection { extends AbstractDuplexTransportConnection {
private final BluetoothConnectionManager connectionManager;
private final StreamConnection stream; private final StreamConnection stream;
JavaBluetoothTransportConnection(Plugin plugin, StreamConnection stream) { JavaBluetoothTransportConnection(Plugin plugin,
BluetoothConnectionManager connectionManager,
StreamConnection stream) {
super(plugin); super(plugin);
this.stream = stream; this.stream = stream;
this.connectionManager = connectionManager;
} }
@Override @Override
@@ -33,6 +37,10 @@ class JavaBluetoothTransportConnection
@Override @Override
protected void closeConnection(boolean exception) throws IOException { protected void closeConnection(boolean exception) throws IOException {
try {
stream.close(); stream.close();
} finally {
connectionManager.connectionClosed(this);
}
} }
} }

View File

@@ -177,7 +177,7 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }