Ensure all key agreement connection tasks are stopped.

This commit is contained in:
akwizgran
2018-02-26 14:05:59 +00:00
parent 5b177c9901
commit 692db742cf
2 changed files with 84 additions and 45 deletions

View File

@@ -48,6 +48,7 @@ class KeyAgreementConnector {
private final Clock clock; private final Clock clock;
private final KeyAgreementCrypto keyAgreementCrypto; private final KeyAgreementCrypto keyAgreementCrypto;
private final PluginManager pluginManager; private final PluginManager pluginManager;
private final Executor ioExecutor;
private final CompletionService<KeyAgreementConnection> connect; private final CompletionService<KeyAgreementConnection> connect;
private final List<KeyAgreementListener> listeners = new ArrayList<>(); private final List<KeyAgreementListener> listeners = new ArrayList<>();
@@ -56,6 +57,7 @@ class KeyAgreementConnector {
private volatile boolean connecting = false; private volatile boolean connecting = false;
private volatile boolean alice = false; private volatile boolean alice = false;
private volatile boolean stopped = false;
KeyAgreementConnector(Callbacks callbacks, Clock clock, KeyAgreementConnector(Callbacks callbacks, Clock clock,
KeyAgreementCrypto keyAgreementCrypto, PluginManager pluginManager, KeyAgreementCrypto keyAgreementCrypto, PluginManager pluginManager,
@@ -64,6 +66,7 @@ class KeyAgreementConnector {
this.clock = clock; this.clock = clock;
this.keyAgreementCrypto = keyAgreementCrypto; this.keyAgreementCrypto = keyAgreementCrypto;
this.pluginManager = pluginManager; this.pluginManager = pluginManager;
this.ioExecutor = ioExecutor;
connect = new ExecutorCompletionService<>(ioExecutor); connect = new ExecutorCompletionService<>(ioExecutor);
} }
@@ -80,6 +83,8 @@ class KeyAgreementConnector {
if (l != null) { if (l != null) {
TransportId id = plugin.getId(); TransportId id = plugin.getId();
descriptors.add(new TransportDescriptor(id, l.getDescriptor())); descriptors.add(new TransportDescriptor(id, l.getDescriptor()));
if (LOG.isLoggable(INFO))
LOG.info("Creating incoming task for " + id);
pending.add(connect.submit(new ReadableTask(l.listen()))); pending.add(connect.submit(new ReadableTask(l.listen())));
listeners.add(l); listeners.add(l);
} }
@@ -89,6 +94,7 @@ class KeyAgreementConnector {
void stopListening() { void stopListening() {
LOG.info("Stopping BQP listeners"); LOG.info("Stopping BQP listeners");
stopped = true;
for (KeyAgreementListener l : listeners) { for (KeyAgreementListener l : listeners) {
l.close(); l.close();
} }
@@ -96,19 +102,23 @@ class KeyAgreementConnector {
} }
@Nullable @Nullable
public KeyAgreementTransport connect(Payload remotePayload, public KeyAgreementTransport connect(Payload remotePayload, boolean alice) {
boolean alice) { // Let the ReadableTasks know if we are Alice
// Let the listeners know if we are Alice
this.connecting = true; this.connecting = true;
this.alice = alice; this.alice = alice;
long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT; long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT;
// Start connecting over supported transports // Start connecting over supported transports
LOG.info("Starting outgoing BQP connections"); if (LOG.isLoggable(INFO)) {
LOG.info("Starting outgoing BQP connections as "
+ (alice ? "Alice" : "Bob"));
}
for (TransportDescriptor d : remotePayload.getTransportDescriptors()) { for (TransportDescriptor d : remotePayload.getTransportDescriptors()) {
Plugin p = pluginManager.getPlugin(d.getId()); Plugin p = pluginManager.getPlugin(d.getId());
if (p instanceof DuplexPlugin) { if (p instanceof DuplexPlugin) {
DuplexPlugin plugin = (DuplexPlugin) p; DuplexPlugin plugin = (DuplexPlugin) p;
if (LOG.isLoggable(INFO))
LOG.info("Creating outgoing task for " + d.getId());
pending.add(connect.submit(new ReadableTask( pending.add(connect.submit(new ReadableTask(
new ConnectorTask(plugin, remotePayload.getCommitment(), new ConnectorTask(plugin, remotePayload.getCommitment(),
d.getDescriptor(), end)))); d.getDescriptor(), end))));
@@ -121,9 +131,9 @@ class KeyAgreementConnector {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
Future<KeyAgreementConnection> f = Future<KeyAgreementConnection> f =
connect.poll(end - now, MILLISECONDS); connect.poll(end - now, MILLISECONDS);
if (f == null) if (f == null) return null; // No task completed within the timeout
return null; // No task completed within the timeout.
chosen = f.get(); chosen = f.get();
if (chosen == null) return null; // We've been stopped
return new KeyAgreementTransport(chosen); return new KeyAgreementTransport(chosen);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Interrupted while waiting for connection"); LOG.info("Interrupted while waiting for connection");
@@ -140,31 +150,55 @@ class KeyAgreementConnector {
} }
private void closePending(@Nullable KeyAgreementConnection chosen) { private void closePending(@Nullable KeyAgreementConnection chosen) {
for (Future<KeyAgreementConnection> f : pending) { List<Future<KeyAgreementConnection>> unfinished = new ArrayList<>();
try { try {
if (f.cancel(true)) { for (Future<KeyAgreementConnection> f : pending) {
LOG.info("Cancelled task"); if (f.isDone()) {
} else if (!f.isCancelled()) { LOG.info("Task is already done");
KeyAgreementConnection c = f.get(); closeIfNotChosen(f, chosen);
if (c != null && c != chosen) } else {
tryToClose(c.getConnection(), false); LOG.info("Task is not done");
unfinished.add(f);
} }
} catch (InterruptedException e) {
LOG.info("Interrupted while closing sockets");
Thread.currentThread().interrupt();
return;
} catch (ExecutionException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
} }
} catch (InterruptedException e) {
LOG.info("Interrupted while closing connections");
Thread.currentThread().interrupt();
}
for (Future<KeyAgreementConnection> f : unfinished) {
ioExecutor.execute(() -> {
try {
closeIfNotChosen(f, chosen);
} catch (InterruptedException e) {
LOG.info("Interrupted while closing connections");
}
});
} }
} }
private void tryToClose(DuplexTransportConnection conn, boolean exception) { private void closeIfNotChosen(Future<KeyAgreementConnection> f,
@Nullable KeyAgreementConnection chosen)
throws InterruptedException {
try { try {
KeyAgreementConnection c = f.get();
if (c == null) {
LOG.info("Result is null");
} else if (c == chosen) {
LOG.info("Not closing chosen connection");
} else {
LOG.info("Closing unchosen connection");
tryToClose(c.getConnection());
}
} catch (ExecutionException e) {
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO))
LOG.info("Closing connection, exception: " + exception); LOG.info("Task threw exception: " + e);
conn.getReader().dispose(exception, true); }
conn.getWriter().dispose(exception); }
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getReader().dispose(false, true);
conn.getWriter().dispose(false);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString()); if (LOG.isLoggable(INFO)) LOG.info(e.toString());
} }
@@ -186,28 +220,28 @@ class KeyAgreementConnector {
} }
@Override @Override
@Nullable
public KeyAgreementConnection call() throws Exception { public KeyAgreementConnection call() throws Exception {
// Repeat attempts until we connect, get interrupted, or time out // Repeat attempts until we connect, get interrupted, or time out
while (true) { while (!stopped) {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
if (now > end) throw new IOException(); if (now >= end) throw new IOException("Timed out");
DuplexTransportConnection conn = DuplexTransportConnection conn =
plugin.createKeyAgreementConnection(commitment, plugin.createKeyAgreementConnection(commitment,
descriptor, end - now); descriptor, end - now);
if (conn != null) { if (conn != null) {
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO))
LOG.info(plugin.getId().getString() + LOG.info(plugin.getId() + ": Outgoing connection");
": Outgoing connection");
return new KeyAgreementConnection(conn, plugin.getId()); return new KeyAgreementConnection(conn, plugin.getId());
} }
// Wait 2s before retry (to circumvent transient failures) // Wait 2s before retry (to circumvent transient failures)
Thread.sleep(2000); Thread.sleep(2000);
} }
return null;
} }
} }
private class ReadableTask private class ReadableTask implements Callable<KeyAgreementConnection> {
implements Callable<KeyAgreementConnection> {
private final Callable<KeyAgreementConnection> connectionTask; private final Callable<KeyAgreementConnection> connectionTask;
@@ -216,24 +250,30 @@ class KeyAgreementConnector {
} }
@Override @Override
@Nullable
public KeyAgreementConnection call() throws Exception { public KeyAgreementConnection call() throws Exception {
KeyAgreementConnection c = connectionTask.call(); KeyAgreementConnection c = connectionTask.call();
if (c == null) return null;
InputStream in = c.getConnection().getReader().getInputStream(); InputStream in = c.getConnection().getReader().getInputStream();
boolean waitingSent = false; boolean waitingSent = false;
while (!alice && in.available() == 0) { try {
if (!waitingSent && connecting && !alice) { while (!stopped && !alice && in.available() == 0) {
// Bob waits here until Alice obtains his payload. if (!waitingSent && connecting && !alice) {
callbacks.connectionWaiting(); // Bob waits here until Alice obtains his payload.
waitingSent = true; callbacks.connectionWaiting();
waitingSent = true;
}
if (LOG.isLoggable(INFO))
LOG.info(c.getTransportId() + ": Waiting for data");
Thread.sleep(1000);
} }
if (LOG.isLoggable(INFO)) { } catch (IOException | InterruptedException e) {
LOG.info(c.getTransportId().getString() + if (LOG.isLoggable(INFO)) LOG.info("Closing connection: " + e);
": Waiting for connection"); tryToClose(c.getConnection());
} throw e;
Thread.sleep(1000);
} }
if (!alice && LOG.isLoggable(INFO)) if (!stopped && !alice && LOG.isLoggable(INFO))
LOG.info(c.getTransportId().getString() + ": Data available"); LOG.info(c.getTransportId() + ": Data available");
return c; return c;
} }
} }

View File

@@ -10,7 +10,6 @@ import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
@NotNullByDefault @NotNullByDefault
@ThreadSafe @ThreadSafe
@@ -61,10 +60,10 @@ class BluetoothConnectionManagerImpl implements BluetoothConnectionManager {
private void tryToClose(DuplexTransportConnection conn) { private void tryToClose(DuplexTransportConnection conn) {
try { try {
conn.getReader().dispose(false, true);
conn.getWriter().dispose(false); conn.getWriter().dispose(false);
conn.getReader().dispose(false, false);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(INFO)) LOG.log(INFO, e.toString(), e);
} }
} }