From cbb65ec80745f7e2c319d157902a27e8e8a6c827 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Tue, 27 Feb 2018 17:51:25 +0000 Subject: [PATCH] Refactor key agreement connection choosing. --- .../bramble/plugin/tor/TorPlugin.java | 2 +- .../keyagreement/KeyAgreementListener.java | 10 +- .../api/plugin/duplex/DuplexPlugin.java | 4 +- .../keyagreement/ConnectionChooser.java | 39 +++ .../keyagreement/ConnectionChooserImpl.java | 153 ++++++++++++ .../keyagreement/KeyAgreementConnector.java | 228 ++++-------------- .../keyagreement/KeyAgreementModule.java | 6 + .../keyagreement/KeyAgreementProtocol.java | 3 +- .../keyagreement/KeyAgreementTaskImpl.java | 16 +- .../plugin/bluetooth/BluetoothPlugin.java | 20 +- .../bramble/plugin/tcp/LanTcpPlugin.java | 16 +- .../bramble/plugin/tcp/TcpPlugin.java | 2 +- .../bramble/plugin/tcp/LanTcpPluginTest.java | 24 +- .../bramble/plugin/modem/ModemPlugin.java | 2 +- 14 files changed, 298 insertions(+), 227 deletions(-) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/keyagreement/ConnectionChooser.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/keyagreement/ConnectionChooserImpl.java diff --git a/bramble-android/src/main/java/org/briarproject/bramble/plugin/tor/TorPlugin.java b/bramble-android/src/main/java/org/briarproject/bramble/plugin/tor/TorPlugin.java index 67e48d3da..7ccaa9a27 100644 --- a/bramble-android/src/main/java/org/briarproject/bramble/plugin/tor/TorPlugin.java +++ b/bramble-android/src/main/java/org/briarproject/bramble/plugin/tor/TorPlugin.java @@ -614,7 +614,7 @@ class TorPlugin implements DuplexPlugin, EventHandler, EventListener { @Override public DuplexTransportConnection createKeyAgreementConnection( - byte[] commitment, BdfList descriptor, long timeout) { + byte[] commitment, BdfList descriptor) { throw new UnsupportedOperationException(); } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/keyagreement/KeyAgreementListener.java b/bramble-api/src/main/java/org/briarproject/bramble/api/keyagreement/KeyAgreementListener.java index 42fda82f9..8c520b47e 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/keyagreement/KeyAgreementListener.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/keyagreement/KeyAgreementListener.java @@ -2,7 +2,7 @@ package org.briarproject.bramble.api.keyagreement; import org.briarproject.bramble.api.data.BdfList; -import java.util.concurrent.Callable; +import java.io.IOException; /** * An class for managing a particular key agreement listener. @@ -24,11 +24,11 @@ public abstract class KeyAgreementListener { } /** - * Starts listening for incoming connections, and returns a Callable that - * will return a KeyAgreementConnection when an incoming connection is - * received. + * Blocks until an incoming connection is received and returns it. + * + * @throws IOException if an error occurs or {@link #close()} is called. */ - public abstract Callable listen(); + public abstract KeyAgreementConnection accept() throws IOException; /** * Closes the underlying server socket. diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/DuplexPlugin.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/DuplexPlugin.java index 83fc64248..8ab6c4fe4 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/DuplexPlugin.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/DuplexPlugin.java @@ -36,9 +36,9 @@ public interface DuplexPlugin extends Plugin { /** * Attempts to connect to the remote peer specified in the given descriptor. - * Returns null if no connection can be established within the given time. + * Returns null if no connection can be established. */ @Nullable DuplexTransportConnection createKeyAgreementConnection( - byte[] remoteCommitment, BdfList descriptor, long timeout); + byte[] remoteCommitment, BdfList descriptor); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/ConnectionChooser.java b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/ConnectionChooser.java new file mode 100644 index 000000000..c29f4b37f --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/ConnectionChooser.java @@ -0,0 +1,39 @@ +package org.briarproject.bramble.keyagreement; + +import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection; + +import javax.annotation.Nullable; + +interface ConnectionChooser { + + /** + * Adds a connection to the set of connections that may be chosen. If + * {@link #stop()} has already been called, the connection will be closed + * immediately. + */ + void addConnection(KeyAgreementConnection c); + + /** + * Chooses one of the connections passed to + * {@link #addConnection(KeyAgreementConnection)} and returns it, + * waiting up to the given amount of time for a suitable connection to + * become available. Returns null if the time elapses without a suitable + * connection becoming available. + * + * @param alice true if the local party is Alice + * @param timeout the timeout in milliseconds + * @throws InterruptedException if the thread is interrupted while waiting + * for a suitable connection to become available + */ + @Nullable + KeyAgreementConnection chooseConnection(boolean alice, long timeout) + throws InterruptedException; + + /** + * Stops the chooser from accepting new connections. Closes any connections + * already passed to {@link #addConnection(KeyAgreementConnection)} + * and not chosen. Any connections subsequently passed to + * {@link #addConnection(KeyAgreementConnection)} will be closed. + */ + void stop(); +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/ConnectionChooserImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/ConnectionChooserImpl.java new file mode 100644 index 000000000..c5a0de383 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/ConnectionChooserImpl.java @@ -0,0 +1,153 @@ +package org.briarproject.bramble.keyagreement; + +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection; +import org.briarproject.bramble.api.keyagreement.event.KeyAgreementWaitingEvent; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.system.Clock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.logging.Logger; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; + +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; + +@NotNullByDefault +@ThreadSafe +class ConnectionChooserImpl implements ConnectionChooser { + + private static final Logger LOG = + Logger.getLogger(ConnectionChooserImpl.class.getName()); + + private final EventBus eventBus; + private final Clock clock; + private final Object lock = new Object(); + private final List connections = + new ArrayList<>(); // Locking: lock + private boolean stopped = false; // Locking: lock + + @Inject + ConnectionChooserImpl(EventBus eventBus, Clock clock) { + this.eventBus = eventBus; + this.clock = clock; + } + + @Override + public void addConnection(KeyAgreementConnection conn) { + boolean close = false; + synchronized (lock) { + if (stopped) { + // Already stopped, close the connection + close = true; + } else { + connections.add(conn); + lock.notifyAll(); + } + } + if (close) tryToClose(conn.getConnection()); + } + + @Nullable + @Override + public KeyAgreementConnection chooseConnection(boolean alice, long timeout) + throws InterruptedException { + if (alice) return chooseConnectionAlice(timeout); + else return chooseConnectionBob(timeout); + } + + @Nullable + private KeyAgreementConnection chooseConnectionAlice(long timeout) + throws InterruptedException { + LOG.info("Choosing connection for Alice"); + long now = clock.currentTimeMillis(); + long end = now + timeout; + KeyAgreementConnection chosen; + synchronized (lock) { + // Wait until we're stopped, a connection is added, or we time out + while (!stopped && connections.isEmpty() && now < end) { + lock.wait(end - now); + now = clock.currentTimeMillis(); + } + if (connections.isEmpty()) { + LOG.info("No suitable connection for Alice"); + return null; + } + // Choose the first connection + chosen = connections.remove(0); + } + if (LOG.isLoggable(INFO)) + LOG.info("Choosing " + chosen.getTransportId()); + return chosen; + } + + @Nullable + private KeyAgreementConnection chooseConnectionBob(long timeout) + throws InterruptedException { + LOG.info("Choosing connection for Bob"); + // Bob waits here for Alice to scan his QR code, determine her role, + // choose a connection and send her key + eventBus.broadcast(new KeyAgreementWaitingEvent()); + long now = clock.currentTimeMillis(); + long end = now + timeout; + synchronized (lock) { + while (!stopped && now < end) { + // Check whether any connection has data available + Iterator it = connections.iterator(); + while (it.hasNext()) { + KeyAgreementConnection c = it.next(); + try { + int available = c.getConnection().getReader() + .getInputStream().available(); + if (available > 0) { + if (LOG.isLoggable(INFO)) + LOG.info("Choosing " + c.getTransportId()); + it.remove(); + return c; + } + } catch (IOException e) { + if (LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + tryToClose(c.getConnection()); + it.remove(); + } + } + // Wait for 1 second before checking again + lock.wait(Math.min(1000, end - now)); + now = clock.currentTimeMillis(); + } + } + LOG.info("No suitable connection for Bob"); + return null; + } + + @Override + public void stop() { + List unused; + synchronized (lock) { + stopped = true; + unused = new ArrayList<>(connections); + connections.clear(); + lock.notifyAll(); + } + if (LOG.isLoggable(INFO)) + LOG.info("Closing " + unused.size() + " unused connections"); + for (KeyAgreementConnection c : unused) tryToClose(c.getConnection()); + } + + private void tryToClose(DuplexTransportConnection conn) { + try { + conn.getReader().dispose(false, true); + conn.getWriter().dispose(false); + } catch (IOException e) { + if (LOG.isLoggable(INFO)) LOG.info(e.toString()); + } + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementConnector.java b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementConnector.java index 50f918e58..b322a231b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementConnector.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementConnector.java @@ -7,6 +7,7 @@ import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection; import org.briarproject.bramble.api.keyagreement.KeyAgreementListener; import org.briarproject.bramble.api.keyagreement.Payload; import org.briarproject.bramble.api.keyagreement.TransportDescriptor; +import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.Plugin; import org.briarproject.bramble.api.plugin.PluginManager; @@ -16,20 +17,14 @@ import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.system.Clock; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; import java.util.logging.Logger; import javax.annotation.Nullable; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static org.briarproject.bramble.api.keyagreement.KeyAgreementConstants.CONNECTION_TIMEOUT; @@ -37,40 +32,31 @@ import static org.briarproject.bramble.api.keyagreement.KeyAgreementConstants.CO @NotNullByDefault class KeyAgreementConnector { - interface Callbacks { - void connectionWaiting(); - } - private static final Logger LOG = Logger.getLogger(KeyAgreementConnector.class.getName()); - private final Callbacks callbacks; private final Clock clock; private final KeyAgreementCrypto keyAgreementCrypto; private final PluginManager pluginManager; private final Executor ioExecutor; - private final CompletionService connect; + private final ConnectionChooser connectionChooser; - private final List listeners = new ArrayList<>(); - private final List> pending = - new ArrayList<>(); + private final List listeners = + new CopyOnWriteArrayList<>(); - private volatile boolean connecting = false; - private volatile boolean alice = false; private volatile boolean stopped = false; - KeyAgreementConnector(Callbacks callbacks, Clock clock, - KeyAgreementCrypto keyAgreementCrypto, PluginManager pluginManager, - Executor ioExecutor) { - this.callbacks = callbacks; + KeyAgreementConnector(Clock clock, KeyAgreementCrypto keyAgreementCrypto, + PluginManager pluginManager, Executor ioExecutor, + ConnectionChooser connectionChooser) { this.clock = clock; this.keyAgreementCrypto = keyAgreementCrypto; this.pluginManager = pluginManager; this.ioExecutor = ioExecutor; - connect = new ExecutorCompletionService<>(ioExecutor); + this.connectionChooser = connectionChooser; } - public Payload listen(KeyPair localKeyPair) { + Payload listen(KeyPair localKeyPair) { LOG.info("Starting BQP listeners"); // Derive commitment byte[] commitment = keyAgreementCrypto.deriveKeyCommitment( @@ -83,10 +69,16 @@ class KeyAgreementConnector { if (l != null) { TransportId id = plugin.getId(); descriptors.add(new TransportDescriptor(id, l.getDescriptor())); - if (LOG.isLoggable(INFO)) - LOG.info("Creating incoming task for " + id); - pending.add(connect.submit(new ReadableTask(l.listen()))); + if (LOG.isLoggable(INFO)) LOG.info("Listening via " + id); listeners.add(l); + ioExecutor.execute(() -> { + try { + connectionChooser.addConnection(l.accept()); + } catch (IOException e) { + if (LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + } + }); } } return new Payload(commitment, descriptors); @@ -95,19 +87,13 @@ class KeyAgreementConnector { void stopListening() { LOG.info("Stopping BQP listeners"); stopped = true; - for (KeyAgreementListener l : listeners) { - l.close(); - } + for (KeyAgreementListener l : listeners) l.close(); listeners.clear(); + connectionChooser.stop(); } @Nullable public KeyAgreementTransport connect(Payload remotePayload, boolean alice) { - // Let the ReadableTasks know if we are Alice - this.connecting = true; - this.alice = alice; - long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT; - // Start connecting over supported transports if (LOG.isLoggable(INFO)) { LOG.info("Starting outgoing BQP connections as " @@ -116,165 +102,55 @@ class KeyAgreementConnector { for (TransportDescriptor d : remotePayload.getTransportDescriptors()) { Plugin p = pluginManager.getPlugin(d.getId()); if (p instanceof DuplexPlugin) { - DuplexPlugin plugin = (DuplexPlugin) p; if (LOG.isLoggable(INFO)) - LOG.info("Creating outgoing task for " + d.getId()); - pending.add(connect.submit(new ReadableTask( - new ConnectorTask(plugin, remotePayload.getCommitment(), - d.getDescriptor(), end)))); + LOG.info("Connecting via " + d.getId()); + DuplexPlugin plugin = (DuplexPlugin) p; + byte[] commitment = remotePayload.getCommitment(); + BdfList descriptor = d.getDescriptor(); + ioExecutor.execute(() -> { + try { + KeyAgreementConnection c = + connect(plugin, commitment, descriptor); + if (c != null) connectionChooser.addConnection(c); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting to connect"); + } + }); } } // Get chosen connection - KeyAgreementConnection chosen = null; try { - long now = clock.currentTimeMillis(); - Future f = - connect.poll(end - now, MILLISECONDS); - if (f == null) return null; // No task completed within the timeout - chosen = f.get(); - if (chosen == null) return null; // We've been stopped + KeyAgreementConnection chosen = connectionChooser.chooseConnection( + alice, CONNECTION_TIMEOUT); + if (chosen == null) return null; // No suitable connection return new KeyAgreementTransport(chosen); } catch (InterruptedException e) { LOG.info("Interrupted while waiting for connection"); Thread.currentThread().interrupt(); return null; - } catch (ExecutionException | IOException e) { + } catch (IOException e) { if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); return null; } finally { stopListening(); - // Close all other connections - closePending(chosen); } } - private void closePending(@Nullable KeyAgreementConnection chosen) { - List> unfinished = new ArrayList<>(); - try { - for (Future f : pending) { - if (f.isDone()) { - LOG.info("Task is already done"); - closeIfNotChosen(f, chosen); - } else { - LOG.info("Task is not done"); - unfinished.add(f); - } - } - } catch (InterruptedException e) { - LOG.info("Interrupted while closing connections"); - Thread.currentThread().interrupt(); - } - for (Future f : unfinished) { - ioExecutor.execute(() -> { - try { - closeIfNotChosen(f, chosen); - } catch (InterruptedException e) { - LOG.info("Interrupted while closing connections"); - } - }); - } - } - - private void closeIfNotChosen(Future f, - @Nullable KeyAgreementConnection chosen) - throws InterruptedException { - try { - KeyAgreementConnection c = f.get(); - if (c == null) { - LOG.info("Result is null"); - } else if (c == chosen) { - LOG.info("Not closing chosen connection"); - } else { - LOG.info("Closing unchosen connection"); - tryToClose(c.getConnection()); - } - } catch (ExecutionException e) { - if (LOG.isLoggable(INFO)) - LOG.info("Task threw exception: " + e); - } - } - - private void tryToClose(DuplexTransportConnection conn) { - try { - conn.getReader().dispose(false, true); - conn.getWriter().dispose(false); - } catch (IOException e) { - if (LOG.isLoggable(INFO)) LOG.info(e.toString()); - } - } - - private class ConnectorTask implements Callable { - - private final byte[] commitment; - private final BdfList descriptor; - private final long end; - private final DuplexPlugin plugin; - - private ConnectorTask(DuplexPlugin plugin, byte[] commitment, - BdfList descriptor, long end) { - this.plugin = plugin; - this.commitment = commitment; - this.descriptor = descriptor; - this.end = end; - } - - @Override - @Nullable - public KeyAgreementConnection call() throws Exception { - // Repeat attempts until we connect, get interrupted, or time out - while (!stopped) { - long now = clock.currentTimeMillis(); - if (now >= end) throw new IOException("Timed out"); - DuplexTransportConnection conn = - plugin.createKeyAgreementConnection(commitment, - descriptor, end - now); - if (conn != null) { - if (LOG.isLoggable(INFO)) - LOG.info(plugin.getId() + ": Outgoing connection"); - return new KeyAgreementConnection(conn, plugin.getId()); - } - // Wait 2s before retry (to circumvent transient failures) - Thread.sleep(2000); - } - return null; - } - } - - private class ReadableTask implements Callable { - - private final Callable connectionTask; - - private ReadableTask(Callable connectionTask) { - this.connectionTask = connectionTask; - } - - @Override - @Nullable - public KeyAgreementConnection call() throws Exception { - KeyAgreementConnection c = connectionTask.call(); - if (c == null) return null; - InputStream in = c.getConnection().getReader().getInputStream(); - boolean waitingSent = false; - try { - while (!stopped && !alice && in.available() == 0) { - if (!waitingSent && connecting && !alice) { - // Bob waits here until Alice obtains his payload. - callbacks.connectionWaiting(); - waitingSent = true; - } - if (LOG.isLoggable(INFO)) - LOG.info(c.getTransportId() + ": Waiting for data"); - Thread.sleep(1000); - } - } catch (IOException | InterruptedException e) { - if (LOG.isLoggable(INFO)) LOG.info("Closing connection: " + e); - tryToClose(c.getConnection()); - throw e; - } - if (!stopped && !alice && LOG.isLoggable(INFO)) - LOG.info(c.getTransportId() + ": Data available"); - return c; + @Nullable + @IoExecutor + private KeyAgreementConnection connect(DuplexPlugin plugin, + byte[] commitment, BdfList descriptor) throws InterruptedException { + // Repeat attempts until we time out, get stopped, or get interrupted + long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT; + while (!stopped && clock.currentTimeMillis() < end) { + DuplexTransportConnection conn = + plugin.createKeyAgreementConnection(commitment, descriptor); + if (conn != null) + return new KeyAgreementConnection(conn, plugin.getId()); + // Wait 2s before retry (to circumvent transient failures) + Thread.sleep(2000); } + return null; } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementModule.java b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementModule.java index e7ec82dab..819832820 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementModule.java @@ -27,4 +27,10 @@ public class KeyAgreementModule { PayloadParser providePayloadParser(BdfReaderFactory bdfReaderFactory) { return new PayloadParserImpl(bdfReaderFactory); } + + @Provides + ConnectionChooser provideConnectionChooser( + ConnectionChooserImpl connectionChooser) { + return connectionChooser; + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementProtocol.java b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementProtocol.java index ab5ea4f92..5825c9500 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementProtocol.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementProtocol.java @@ -99,7 +99,8 @@ class KeyAgreementProtocol { PublicKey theirPublicKey; if (alice) { sendKey(); - // Alice waits here until Bob obtains her payload. + // Alice waits here for Bob to scan her QR code, determine his + // role, receive her key and respond with his key callbacks.connectionWaiting(); theirPublicKey = receiveKey(); } else { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementTaskImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementTaskImpl.java index e0d97313d..b06522d81 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementTaskImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementTaskImpl.java @@ -32,8 +32,7 @@ import static java.util.logging.Level.WARNING; @MethodsNotNullByDefault @ParametersNotNullByDefault class KeyAgreementTaskImpl extends Thread implements - KeyAgreementTask, KeyAgreementConnector.Callbacks, - KeyAgreementProtocol.Callbacks { + KeyAgreementTask, KeyAgreementProtocol.Callbacks { private static final Logger LOG = Logger.getLogger(KeyAgreementTaskImpl.class.getName()); @@ -52,14 +51,15 @@ class KeyAgreementTaskImpl extends Thread implements KeyAgreementTaskImpl(Clock clock, CryptoComponent crypto, KeyAgreementCrypto keyAgreementCrypto, EventBus eventBus, PayloadEncoder payloadEncoder, PluginManager pluginManager, - @IoExecutor Executor ioExecutor) { + @IoExecutor Executor ioExecutor, + ConnectionChooser connectionChooser) { this.crypto = crypto; this.keyAgreementCrypto = keyAgreementCrypto; this.eventBus = eventBus; this.payloadEncoder = payloadEncoder; localKeyPair = crypto.generateAgreementKeyPair(); - connector = new KeyAgreementConnector(this, clock, keyAgreementCrypto, - pluginManager, ioExecutor); + connector = new KeyAgreementConnector(clock, keyAgreementCrypto, + pluginManager, ioExecutor, connectionChooser); } @Override @@ -73,10 +73,8 @@ class KeyAgreementTaskImpl extends Thread implements @Override public synchronized void stopListening() { if (localPayload != null) { - if (remotePayload == null) - connector.stopListening(); - else - interrupt(); + if (remotePayload == null) connector.stopListening(); + else interrupt(); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothPlugin.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothPlugin.java index 594bcff94..5c14bc761 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothPlugin.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothPlugin.java @@ -28,7 +28,6 @@ import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -324,7 +323,7 @@ abstract class BluetoothPlugin implements DuplexPlugin, EventListener { DuplexTransportConnection conn = connect(address, uuid); if (conn == null) return null; // TODO: Why don't we reset the backoff here? - return connectionManager.connectionOpened(conn, false) ? conn : null; + return connectionManager.connectionOpened(conn, false) ? conn : null; } @Override @@ -361,7 +360,7 @@ abstract class BluetoothPlugin implements DuplexPlugin, EventListener { @Override public DuplexTransportConnection createKeyAgreementConnection( - byte[] commitment, BdfList descriptor, long timeout) { + byte[] commitment, BdfList descriptor) { if (!isRunning()) return null; String address; try { @@ -427,15 +426,12 @@ abstract class BluetoothPlugin implements DuplexPlugin, EventListener { } @Override - public Callable listen() { - return () -> { - DuplexTransportConnection conn = acceptConnection(ss); - if (LOG.isLoggable(INFO)) - LOG.info(ID.getString() + ": Incoming connection"); - // The connection limit doesn't apply to key agreement - connectionManager.connectionOpened(conn, true); - return new KeyAgreementConnection(conn, ID); - }; + public KeyAgreementConnection accept() throws IOException { + DuplexTransportConnection conn = acceptConnection(ss); + if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection"); + // The connection limit doesn't apply to key agreement + connectionManager.connectionOpened(conn, true); + return new KeyAgreementConnection(conn, ID); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/tcp/LanTcpPlugin.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/tcp/LanTcpPlugin.java index 37c5935f9..450367381 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/tcp/LanTcpPlugin.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/tcp/LanTcpPlugin.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.logging.Logger; @@ -224,7 +223,7 @@ class LanTcpPlugin extends TcpPlugin { @Override public DuplexTransportConnection createKeyAgreementConnection( - byte[] commitment, BdfList descriptor, long timeout) { + byte[] commitment, BdfList descriptor) { if (!isRunning()) return null; InetSocketAddress remote; try { @@ -283,14 +282,11 @@ class LanTcpPlugin extends TcpPlugin { } @Override - public Callable listen() { - return () -> { - Socket s = ss.accept(); - if (LOG.isLoggable(INFO)) - LOG.info(ID.getString() + ": Incoming connection"); - return new KeyAgreementConnection( - new TcpTransportConnection(LanTcpPlugin.this, s), ID); - }; + public KeyAgreementConnection accept() throws IOException { + Socket s = ss.accept(); + if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection"); + return new KeyAgreementConnection(new TcpTransportConnection( + LanTcpPlugin.this, s), ID); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/tcp/TcpPlugin.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/tcp/TcpPlugin.java index af6922793..9b686fc86 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/tcp/TcpPlugin.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/tcp/TcpPlugin.java @@ -297,7 +297,7 @@ abstract class TcpPlugin implements DuplexPlugin { @Override public DuplexTransportConnection createKeyAgreementConnection( - byte[] commitment, BdfList descriptor, long timeout) { + byte[] commitment, BdfList descriptor) { throw new UnsupportedOperationException(); } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/plugin/tcp/LanTcpPluginTest.java b/bramble-core/src/test/java/org/briarproject/bramble/plugin/tcp/LanTcpPluginTest.java index ede3df810..b5449ed34 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/plugin/tcp/LanTcpPluginTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/plugin/tcp/LanTcpPluginTest.java @@ -2,7 +2,6 @@ package org.briarproject.bramble.plugin.tcp; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.data.BdfList; -import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection; import org.briarproject.bramble.api.keyagreement.KeyAgreementListener; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.Backoff; @@ -26,11 +25,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.Hashtable; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.concurrent.TimeUnit.SECONDS; @@ -195,9 +192,16 @@ public class LanTcpPluginTest extends BrambleTestCase { KeyAgreementListener kal = plugin.createKeyAgreementListener(new byte[COMMIT_LENGTH]); assertNotNull(kal); - Callable c = kal.listen(); - FutureTask f = new FutureTask<>(c); - new Thread(f).start(); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean error = new AtomicBoolean(false); + new Thread(() -> { + try { + kal.accept(); + latch.countDown(); + } catch (IOException e) { + error.set(true); + } + }).start(); // The plugin should have bound a socket and stored the port number BdfList descriptor = kal.getDescriptor(); assertEquals(3, descriptor.size()); @@ -213,10 +217,12 @@ public class LanTcpPluginTest extends BrambleTestCase { InetSocketAddress socketAddr = new InetSocketAddress(addr, port); Socket s = new Socket(); s.connect(socketAddr, 100); - assertNotNull(f.get(5, SECONDS)); + // Check that the connection was accepted + assertTrue(latch.await(5, SECONDS)); + assertFalse(error.get()); + // Clean up s.close(); kal.close(); - // Stop the plugin plugin.stop(); } @@ -262,7 +268,7 @@ public class LanTcpPluginTest extends BrambleTestCase { descriptor.add(local.getPort()); // Connect to the port DuplexTransportConnection d = plugin.createKeyAgreementConnection( - new byte[COMMIT_LENGTH], descriptor, 5000); + new byte[COMMIT_LENGTH], descriptor); assertNotNull(d); // Check that the connection was accepted assertTrue(latch.await(5, SECONDS)); diff --git a/bramble-j2se/src/main/java/org/briarproject/bramble/plugin/modem/ModemPlugin.java b/bramble-j2se/src/main/java/org/briarproject/bramble/plugin/modem/ModemPlugin.java index 269e7feac..340b27c8e 100644 --- a/bramble-j2se/src/main/java/org/briarproject/bramble/plugin/modem/ModemPlugin.java +++ b/bramble-j2se/src/main/java/org/briarproject/bramble/plugin/modem/ModemPlugin.java @@ -177,7 +177,7 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback { @Override public DuplexTransportConnection createKeyAgreementConnection( - byte[] commitment, BdfList descriptor, long timeout) { + byte[] commitment, BdfList descriptor) { throw new UnsupportedOperationException(); }