From 692db742cfee1cfd0862fef6f79ec4f2fe27392f Mon Sep 17 00:00:00 2001 From: akwizgran Date: Mon, 26 Feb 2018 14:05:59 +0000 Subject: [PATCH] Ensure all key agreement connection tasks are stopped. --- .../keyagreement/KeyAgreementConnector.java | 124 ++++++++++++------ .../BluetoothConnectionManagerImpl.java | 5 +- 2 files changed, 84 insertions(+), 45 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementConnector.java b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementConnector.java index 89bf01ee0..50f918e58 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementConnector.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementConnector.java @@ -48,6 +48,7 @@ class KeyAgreementConnector { private final Clock clock; private final KeyAgreementCrypto keyAgreementCrypto; private final PluginManager pluginManager; + private final Executor ioExecutor; private final CompletionService connect; private final List listeners = new ArrayList<>(); @@ -56,6 +57,7 @@ class KeyAgreementConnector { private volatile boolean connecting = false; private volatile boolean alice = false; + private volatile boolean stopped = false; KeyAgreementConnector(Callbacks callbacks, Clock clock, KeyAgreementCrypto keyAgreementCrypto, PluginManager pluginManager, @@ -64,6 +66,7 @@ class KeyAgreementConnector { this.clock = clock; this.keyAgreementCrypto = keyAgreementCrypto; this.pluginManager = pluginManager; + this.ioExecutor = ioExecutor; connect = new ExecutorCompletionService<>(ioExecutor); } @@ -80,6 +83,8 @@ class KeyAgreementConnector { if (l != null) { TransportId id = plugin.getId(); descriptors.add(new TransportDescriptor(id, l.getDescriptor())); + if (LOG.isLoggable(INFO)) + LOG.info("Creating incoming task for " + id); pending.add(connect.submit(new ReadableTask(l.listen()))); listeners.add(l); } @@ -89,6 +94,7 @@ class KeyAgreementConnector { void stopListening() { LOG.info("Stopping BQP listeners"); + stopped = true; for (KeyAgreementListener l : listeners) { l.close(); } @@ -96,19 +102,23 @@ class KeyAgreementConnector { } @Nullable - public KeyAgreementTransport connect(Payload remotePayload, - boolean alice) { - // Let the listeners know if we are Alice + public KeyAgreementTransport connect(Payload remotePayload, boolean alice) { + // Let the ReadableTasks know if we are Alice this.connecting = true; this.alice = alice; long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT; // Start connecting over supported transports - LOG.info("Starting outgoing BQP connections"); + if (LOG.isLoggable(INFO)) { + LOG.info("Starting outgoing BQP connections as " + + (alice ? "Alice" : "Bob")); + } for (TransportDescriptor d : remotePayload.getTransportDescriptors()) { Plugin p = pluginManager.getPlugin(d.getId()); if (p instanceof DuplexPlugin) { DuplexPlugin plugin = (DuplexPlugin) p; + if (LOG.isLoggable(INFO)) + LOG.info("Creating outgoing task for " + d.getId()); pending.add(connect.submit(new ReadableTask( new ConnectorTask(plugin, remotePayload.getCommitment(), d.getDescriptor(), end)))); @@ -121,9 +131,9 @@ class KeyAgreementConnector { long now = clock.currentTimeMillis(); Future f = connect.poll(end - now, MILLISECONDS); - if (f == null) - return null; // No task completed within the timeout. + if (f == null) return null; // No task completed within the timeout chosen = f.get(); + if (chosen == null) return null; // We've been stopped return new KeyAgreementTransport(chosen); } catch (InterruptedException e) { LOG.info("Interrupted while waiting for connection"); @@ -140,31 +150,55 @@ class KeyAgreementConnector { } private void closePending(@Nullable KeyAgreementConnection chosen) { - for (Future f : pending) { - try { - if (f.cancel(true)) { - LOG.info("Cancelled task"); - } else if (!f.isCancelled()) { - KeyAgreementConnection c = f.get(); - if (c != null && c != chosen) - tryToClose(c.getConnection(), false); + List> unfinished = new ArrayList<>(); + try { + for (Future f : pending) { + if (f.isDone()) { + LOG.info("Task is already done"); + closeIfNotChosen(f, chosen); + } else { + LOG.info("Task is not done"); + unfinished.add(f); } - } catch (InterruptedException e) { - LOG.info("Interrupted while closing sockets"); - Thread.currentThread().interrupt(); - return; - } catch (ExecutionException e) { - if (LOG.isLoggable(INFO)) LOG.info(e.toString()); } + } catch (InterruptedException e) { + LOG.info("Interrupted while closing connections"); + Thread.currentThread().interrupt(); + } + for (Future f : unfinished) { + ioExecutor.execute(() -> { + try { + closeIfNotChosen(f, chosen); + } catch (InterruptedException e) { + LOG.info("Interrupted while closing connections"); + } + }); } } - private void tryToClose(DuplexTransportConnection conn, boolean exception) { + private void closeIfNotChosen(Future f, + @Nullable KeyAgreementConnection chosen) + throws InterruptedException { try { + KeyAgreementConnection c = f.get(); + if (c == null) { + LOG.info("Result is null"); + } else if (c == chosen) { + LOG.info("Not closing chosen connection"); + } else { + LOG.info("Closing unchosen connection"); + tryToClose(c.getConnection()); + } + } catch (ExecutionException e) { if (LOG.isLoggable(INFO)) - LOG.info("Closing connection, exception: " + exception); - conn.getReader().dispose(exception, true); - conn.getWriter().dispose(exception); + LOG.info("Task threw exception: " + e); + } + } + + private void tryToClose(DuplexTransportConnection conn) { + try { + conn.getReader().dispose(false, true); + conn.getWriter().dispose(false); } catch (IOException e) { if (LOG.isLoggable(INFO)) LOG.info(e.toString()); } @@ -186,28 +220,28 @@ class KeyAgreementConnector { } @Override + @Nullable public KeyAgreementConnection call() throws Exception { // Repeat attempts until we connect, get interrupted, or time out - while (true) { + while (!stopped) { long now = clock.currentTimeMillis(); - if (now > end) throw new IOException(); + if (now >= end) throw new IOException("Timed out"); DuplexTransportConnection conn = plugin.createKeyAgreementConnection(commitment, descriptor, end - now); if (conn != null) { if (LOG.isLoggable(INFO)) - LOG.info(plugin.getId().getString() + - ": Outgoing connection"); + LOG.info(plugin.getId() + ": Outgoing connection"); return new KeyAgreementConnection(conn, plugin.getId()); } // Wait 2s before retry (to circumvent transient failures) Thread.sleep(2000); } + return null; } } - private class ReadableTask - implements Callable { + private class ReadableTask implements Callable { private final Callable connectionTask; @@ -216,24 +250,30 @@ class KeyAgreementConnector { } @Override + @Nullable public KeyAgreementConnection call() throws Exception { KeyAgreementConnection c = connectionTask.call(); + if (c == null) return null; InputStream in = c.getConnection().getReader().getInputStream(); boolean waitingSent = false; - while (!alice && in.available() == 0) { - if (!waitingSent && connecting && !alice) { - // Bob waits here until Alice obtains his payload. - callbacks.connectionWaiting(); - waitingSent = true; + try { + while (!stopped && !alice && in.available() == 0) { + if (!waitingSent && connecting && !alice) { + // Bob waits here until Alice obtains his payload. + callbacks.connectionWaiting(); + waitingSent = true; + } + if (LOG.isLoggable(INFO)) + LOG.info(c.getTransportId() + ": Waiting for data"); + Thread.sleep(1000); } - if (LOG.isLoggable(INFO)) { - LOG.info(c.getTransportId().getString() + - ": Waiting for connection"); - } - Thread.sleep(1000); + } catch (IOException | InterruptedException e) { + if (LOG.isLoggable(INFO)) LOG.info("Closing connection: " + e); + tryToClose(c.getConnection()); + throw e; } - if (!alice && LOG.isLoggable(INFO)) - LOG.info(c.getTransportId().getString() + ": Data available"); + if (!stopped && !alice && LOG.isLoggable(INFO)) + LOG.info(c.getTransportId() + ": Data available"); return c; } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionManagerImpl.java index e1dd7d675..706ace7f8 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/bluetooth/BluetoothConnectionManagerImpl.java @@ -10,7 +10,6 @@ import java.util.logging.Logger; import javax.annotation.concurrent.ThreadSafe; import static java.util.logging.Level.INFO; -import static java.util.logging.Level.WARNING; @NotNullByDefault @ThreadSafe @@ -61,10 +60,10 @@ class BluetoothConnectionManagerImpl implements BluetoothConnectionManager { private void tryToClose(DuplexTransportConnection conn) { try { + conn.getReader().dispose(false, true); conn.getWriter().dispose(false); - conn.getReader().dispose(false, false); } catch (IOException e) { - if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + if (LOG.isLoggable(INFO)) LOG.log(INFO, e.toString(), e); } }