Refactor key agreement connection choosing.

This commit is contained in:
akwizgran
2018-02-27 17:51:25 +00:00
parent 53b38d83e8
commit cbb65ec807
14 changed files with 298 additions and 227 deletions

View File

@@ -614,7 +614,7 @@ class TorPlugin implements DuplexPlugin, EventHandler, EventListener {
@Override
public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) {
byte[] commitment, BdfList descriptor) {
throw new UnsupportedOperationException();
}

View File

@@ -2,7 +2,7 @@ package org.briarproject.bramble.api.keyagreement;
import org.briarproject.bramble.api.data.BdfList;
import java.util.concurrent.Callable;
import java.io.IOException;
/**
* An class for managing a particular key agreement listener.
@@ -24,11 +24,11 @@ public abstract class KeyAgreementListener {
}
/**
* Starts listening for incoming connections, and returns a Callable that
* will return a KeyAgreementConnection when an incoming connection is
* received.
* Blocks until an incoming connection is received and returns it.
*
* @throws IOException if an error occurs or {@link #close()} is called.
*/
public abstract Callable<KeyAgreementConnection> listen();
public abstract KeyAgreementConnection accept() throws IOException;
/**
* Closes the underlying server socket.

View File

@@ -36,9 +36,9 @@ public interface DuplexPlugin extends Plugin {
/**
* Attempts to connect to the remote peer specified in the given descriptor.
* Returns null if no connection can be established within the given time.
* Returns null if no connection can be established.
*/
@Nullable
DuplexTransportConnection createKeyAgreementConnection(
byte[] remoteCommitment, BdfList descriptor, long timeout);
byte[] remoteCommitment, BdfList descriptor);
}

View File

@@ -0,0 +1,39 @@
package org.briarproject.bramble.keyagreement;
import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import javax.annotation.Nullable;
interface ConnectionChooser {
/**
* Adds a connection to the set of connections that may be chosen. If
* {@link #stop()} has already been called, the connection will be closed
* immediately.
*/
void addConnection(KeyAgreementConnection c);
/**
* Chooses one of the connections passed to
* {@link #addConnection(KeyAgreementConnection)} and returns it,
* waiting up to the given amount of time for a suitable connection to
* become available. Returns null if the time elapses without a suitable
* connection becoming available.
*
* @param alice true if the local party is Alice
* @param timeout the timeout in milliseconds
* @throws InterruptedException if the thread is interrupted while waiting
* for a suitable connection to become available
*/
@Nullable
KeyAgreementConnection chooseConnection(boolean alice, long timeout)
throws InterruptedException;
/**
* Stops the chooser from accepting new connections. Closes any connections
* already passed to {@link #addConnection(KeyAgreementConnection)}
* and not chosen. Any connections subsequently passed to
* {@link #addConnection(KeyAgreementConnection)} will be closed.
*/
void stop();
}

View File

@@ -0,0 +1,153 @@
package org.briarproject.bramble.keyagreement;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import org.briarproject.bramble.api.keyagreement.event.KeyAgreementWaitingEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.system.Clock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
@NotNullByDefault
@ThreadSafe
class ConnectionChooserImpl implements ConnectionChooser {
private static final Logger LOG =
Logger.getLogger(ConnectionChooserImpl.class.getName());
private final EventBus eventBus;
private final Clock clock;
private final Object lock = new Object();
private final List<KeyAgreementConnection> connections =
new ArrayList<>(); // Locking: lock
private boolean stopped = false; // Locking: lock
@Inject
ConnectionChooserImpl(EventBus eventBus, Clock clock) {
this.eventBus = eventBus;
this.clock = clock;
}
@Override
public void addConnection(KeyAgreementConnection conn) {
boolean close = false;
synchronized (lock) {
if (stopped) {
// Already stopped, close the connection
close = true;
} else {
connections.add(conn);
lock.notifyAll();
}
}
if (close) tryToClose(conn.getConnection());
}
@Nullable
@Override
public KeyAgreementConnection chooseConnection(boolean alice, long timeout)
throws InterruptedException {
if (alice) return chooseConnectionAlice(timeout);
else return chooseConnectionBob(timeout);
}
@Nullable
private KeyAgreementConnection chooseConnectionAlice(long timeout)
throws InterruptedException {
LOG.info("Choosing connection for Alice");
long now = clock.currentTimeMillis();
long end = now + timeout;
KeyAgreementConnection chosen;
synchronized (lock) {
// Wait until we're stopped, a connection is added, or we time out
while (!stopped && connections.isEmpty() && now < end) {
lock.wait(end - now);
now = clock.currentTimeMillis();
}
if (connections.isEmpty()) {
LOG.info("No suitable connection for Alice");
return null;
}
// Choose the first connection
chosen = connections.remove(0);
}
if (LOG.isLoggable(INFO))
LOG.info("Choosing " + chosen.getTransportId());
return chosen;
}
@Nullable
private KeyAgreementConnection chooseConnectionBob(long timeout)
throws InterruptedException {
LOG.info("Choosing connection for Bob");
// Bob waits here for Alice to scan his QR code, determine her role,
// choose a connection and send her key
eventBus.broadcast(new KeyAgreementWaitingEvent());
long now = clock.currentTimeMillis();
long end = now + timeout;
synchronized (lock) {
while (!stopped && now < end) {
// Check whether any connection has data available
Iterator<KeyAgreementConnection> it = connections.iterator();
while (it.hasNext()) {
KeyAgreementConnection c = it.next();
try {
int available = c.getConnection().getReader()
.getInputStream().available();
if (available > 0) {
if (LOG.isLoggable(INFO))
LOG.info("Choosing " + c.getTransportId());
it.remove();
return c;
}
} catch (IOException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
tryToClose(c.getConnection());
it.remove();
}
}
// Wait for 1 second before checking again
lock.wait(Math.min(1000, end - now));
now = clock.currentTimeMillis();
}
}
LOG.info("No suitable connection for Bob");
return null;
}
@Override
public void stop() {
List<KeyAgreementConnection> unused;
synchronized (lock) {
stopped = true;
unused = new ArrayList<>(connections);
connections.clear();
lock.notifyAll();
}
if (LOG.isLoggable(INFO))
LOG.info("Closing " + unused.size() + " unused connections");
for (KeyAgreementConnection c : unused) tryToClose(c.getConnection());
}
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getReader().dispose(false, true);
conn.getWriter().dispose(false);
} catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
}
}

View File

@@ -7,6 +7,7 @@ import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import org.briarproject.bramble.api.keyagreement.KeyAgreementListener;
import org.briarproject.bramble.api.keyagreement.Payload;
import org.briarproject.bramble.api.keyagreement.TransportDescriptor;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.Plugin;
import org.briarproject.bramble.api.plugin.PluginManager;
@@ -16,20 +17,14 @@ import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.system.Clock;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.keyagreement.KeyAgreementConstants.CONNECTION_TIMEOUT;
@@ -37,40 +32,31 @@ import static org.briarproject.bramble.api.keyagreement.KeyAgreementConstants.CO
@NotNullByDefault
class KeyAgreementConnector {
interface Callbacks {
void connectionWaiting();
}
private static final Logger LOG =
Logger.getLogger(KeyAgreementConnector.class.getName());
private final Callbacks callbacks;
private final Clock clock;
private final KeyAgreementCrypto keyAgreementCrypto;
private final PluginManager pluginManager;
private final Executor ioExecutor;
private final CompletionService<KeyAgreementConnection> connect;
private final ConnectionChooser connectionChooser;
private final List<KeyAgreementListener> listeners = new ArrayList<>();
private final List<Future<KeyAgreementConnection>> pending =
new ArrayList<>();
private final List<KeyAgreementListener> listeners =
new CopyOnWriteArrayList<>();
private volatile boolean connecting = false;
private volatile boolean alice = false;
private volatile boolean stopped = false;
KeyAgreementConnector(Callbacks callbacks, Clock clock,
KeyAgreementCrypto keyAgreementCrypto, PluginManager pluginManager,
Executor ioExecutor) {
this.callbacks = callbacks;
KeyAgreementConnector(Clock clock, KeyAgreementCrypto keyAgreementCrypto,
PluginManager pluginManager, Executor ioExecutor,
ConnectionChooser connectionChooser) {
this.clock = clock;
this.keyAgreementCrypto = keyAgreementCrypto;
this.pluginManager = pluginManager;
this.ioExecutor = ioExecutor;
connect = new ExecutorCompletionService<>(ioExecutor);
this.connectionChooser = connectionChooser;
}
public Payload listen(KeyPair localKeyPair) {
Payload listen(KeyPair localKeyPair) {
LOG.info("Starting BQP listeners");
// Derive commitment
byte[] commitment = keyAgreementCrypto.deriveKeyCommitment(
@@ -83,10 +69,16 @@ class KeyAgreementConnector {
if (l != null) {
TransportId id = plugin.getId();
descriptors.add(new TransportDescriptor(id, l.getDescriptor()));
if (LOG.isLoggable(INFO))
LOG.info("Creating incoming task for " + id);
pending.add(connect.submit(new ReadableTask(l.listen())));
if (LOG.isLoggable(INFO)) LOG.info("Listening via " + id);
listeners.add(l);
ioExecutor.execute(() -> {
try {
connectionChooser.addConnection(l.accept());
} catch (IOException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
});
}
}
return new Payload(commitment, descriptors);
@@ -95,19 +87,13 @@ class KeyAgreementConnector {
void stopListening() {
LOG.info("Stopping BQP listeners");
stopped = true;
for (KeyAgreementListener l : listeners) {
l.close();
}
for (KeyAgreementListener l : listeners) l.close();
listeners.clear();
connectionChooser.stop();
}
@Nullable
public KeyAgreementTransport connect(Payload remotePayload, boolean alice) {
// Let the ReadableTasks know if we are Alice
this.connecting = true;
this.alice = alice;
long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT;
// Start connecting over supported transports
if (LOG.isLoggable(INFO)) {
LOG.info("Starting outgoing BQP connections as "
@@ -116,165 +102,55 @@ class KeyAgreementConnector {
for (TransportDescriptor d : remotePayload.getTransportDescriptors()) {
Plugin p = pluginManager.getPlugin(d.getId());
if (p instanceof DuplexPlugin) {
DuplexPlugin plugin = (DuplexPlugin) p;
if (LOG.isLoggable(INFO))
LOG.info("Creating outgoing task for " + d.getId());
pending.add(connect.submit(new ReadableTask(
new ConnectorTask(plugin, remotePayload.getCommitment(),
d.getDescriptor(), end))));
LOG.info("Connecting via " + d.getId());
DuplexPlugin plugin = (DuplexPlugin) p;
byte[] commitment = remotePayload.getCommitment();
BdfList descriptor = d.getDescriptor();
ioExecutor.execute(() -> {
try {
KeyAgreementConnection c =
connect(plugin, commitment, descriptor);
if (c != null) connectionChooser.addConnection(c);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting to connect");
}
});
}
}
// Get chosen connection
KeyAgreementConnection chosen = null;
try {
long now = clock.currentTimeMillis();
Future<KeyAgreementConnection> f =
connect.poll(end - now, MILLISECONDS);
if (f == null) return null; // No task completed within the timeout
chosen = f.get();
if (chosen == null) return null; // We've been stopped
KeyAgreementConnection chosen = connectionChooser.chooseConnection(
alice, CONNECTION_TIMEOUT);
if (chosen == null) return null; // No suitable connection
return new KeyAgreementTransport(chosen);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for connection");
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException | IOException e) {
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null;
} finally {
stopListening();
// Close all other connections
closePending(chosen);
}
}
private void closePending(@Nullable KeyAgreementConnection chosen) {
List<Future<KeyAgreementConnection>> unfinished = new ArrayList<>();
try {
for (Future<KeyAgreementConnection> f : pending) {
if (f.isDone()) {
LOG.info("Task is already done");
closeIfNotChosen(f, chosen);
} else {
LOG.info("Task is not done");
unfinished.add(f);
}
}
} catch (InterruptedException e) {
LOG.info("Interrupted while closing connections");
Thread.currentThread().interrupt();
}
for (Future<KeyAgreementConnection> f : unfinished) {
ioExecutor.execute(() -> {
try {
closeIfNotChosen(f, chosen);
} catch (InterruptedException e) {
LOG.info("Interrupted while closing connections");
}
});
}
}
private void closeIfNotChosen(Future<KeyAgreementConnection> f,
@Nullable KeyAgreementConnection chosen)
throws InterruptedException {
try {
KeyAgreementConnection c = f.get();
if (c == null) {
LOG.info("Result is null");
} else if (c == chosen) {
LOG.info("Not closing chosen connection");
} else {
LOG.info("Closing unchosen connection");
tryToClose(c.getConnection());
}
} catch (ExecutionException e) {
if (LOG.isLoggable(INFO))
LOG.info("Task threw exception: " + e);
}
}
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getReader().dispose(false, true);
conn.getWriter().dispose(false);
} catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
}
private class ConnectorTask implements Callable<KeyAgreementConnection> {
private final byte[] commitment;
private final BdfList descriptor;
private final long end;
private final DuplexPlugin plugin;
private ConnectorTask(DuplexPlugin plugin, byte[] commitment,
BdfList descriptor, long end) {
this.plugin = plugin;
this.commitment = commitment;
this.descriptor = descriptor;
this.end = end;
}
@Override
@Nullable
public KeyAgreementConnection call() throws Exception {
// Repeat attempts until we connect, get interrupted, or time out
while (!stopped) {
long now = clock.currentTimeMillis();
if (now >= end) throw new IOException("Timed out");
DuplexTransportConnection conn =
plugin.createKeyAgreementConnection(commitment,
descriptor, end - now);
if (conn != null) {
if (LOG.isLoggable(INFO))
LOG.info(plugin.getId() + ": Outgoing connection");
return new KeyAgreementConnection(conn, plugin.getId());
}
// Wait 2s before retry (to circumvent transient failures)
Thread.sleep(2000);
}
return null;
}
}
private class ReadableTask implements Callable<KeyAgreementConnection> {
private final Callable<KeyAgreementConnection> connectionTask;
private ReadableTask(Callable<KeyAgreementConnection> connectionTask) {
this.connectionTask = connectionTask;
}
@Override
@Nullable
public KeyAgreementConnection call() throws Exception {
KeyAgreementConnection c = connectionTask.call();
if (c == null) return null;
InputStream in = c.getConnection().getReader().getInputStream();
boolean waitingSent = false;
try {
while (!stopped && !alice && in.available() == 0) {
if (!waitingSent && connecting && !alice) {
// Bob waits here until Alice obtains his payload.
callbacks.connectionWaiting();
waitingSent = true;
}
if (LOG.isLoggable(INFO))
LOG.info(c.getTransportId() + ": Waiting for data");
Thread.sleep(1000);
}
} catch (IOException | InterruptedException e) {
if (LOG.isLoggable(INFO)) LOG.info("Closing connection: " + e);
tryToClose(c.getConnection());
throw e;
}
if (!stopped && !alice && LOG.isLoggable(INFO))
LOG.info(c.getTransportId() + ": Data available");
return c;
@Nullable
@IoExecutor
private KeyAgreementConnection connect(DuplexPlugin plugin,
byte[] commitment, BdfList descriptor) throws InterruptedException {
// Repeat attempts until we time out, get stopped, or get interrupted
long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT;
while (!stopped && clock.currentTimeMillis() < end) {
DuplexTransportConnection conn =
plugin.createKeyAgreementConnection(commitment, descriptor);
if (conn != null)
return new KeyAgreementConnection(conn, plugin.getId());
// Wait 2s before retry (to circumvent transient failures)
Thread.sleep(2000);
}
return null;
}
}

View File

@@ -27,4 +27,10 @@ public class KeyAgreementModule {
PayloadParser providePayloadParser(BdfReaderFactory bdfReaderFactory) {
return new PayloadParserImpl(bdfReaderFactory);
}
@Provides
ConnectionChooser provideConnectionChooser(
ConnectionChooserImpl connectionChooser) {
return connectionChooser;
}
}

View File

@@ -99,7 +99,8 @@ class KeyAgreementProtocol {
PublicKey theirPublicKey;
if (alice) {
sendKey();
// Alice waits here until Bob obtains her payload.
// Alice waits here for Bob to scan her QR code, determine his
// role, receive her key and respond with his key
callbacks.connectionWaiting();
theirPublicKey = receiveKey();
} else {

View File

@@ -32,8 +32,7 @@ import static java.util.logging.Level.WARNING;
@MethodsNotNullByDefault
@ParametersNotNullByDefault
class KeyAgreementTaskImpl extends Thread implements
KeyAgreementTask, KeyAgreementConnector.Callbacks,
KeyAgreementProtocol.Callbacks {
KeyAgreementTask, KeyAgreementProtocol.Callbacks {
private static final Logger LOG =
Logger.getLogger(KeyAgreementTaskImpl.class.getName());
@@ -52,14 +51,15 @@ class KeyAgreementTaskImpl extends Thread implements
KeyAgreementTaskImpl(Clock clock, CryptoComponent crypto,
KeyAgreementCrypto keyAgreementCrypto, EventBus eventBus,
PayloadEncoder payloadEncoder, PluginManager pluginManager,
@IoExecutor Executor ioExecutor) {
@IoExecutor Executor ioExecutor,
ConnectionChooser connectionChooser) {
this.crypto = crypto;
this.keyAgreementCrypto = keyAgreementCrypto;
this.eventBus = eventBus;
this.payloadEncoder = payloadEncoder;
localKeyPair = crypto.generateAgreementKeyPair();
connector = new KeyAgreementConnector(this, clock, keyAgreementCrypto,
pluginManager, ioExecutor);
connector = new KeyAgreementConnector(clock, keyAgreementCrypto,
pluginManager, ioExecutor, connectionChooser);
}
@Override
@@ -73,10 +73,8 @@ class KeyAgreementTaskImpl extends Thread implements
@Override
public synchronized void stopListening() {
if (localPayload != null) {
if (remotePayload == null)
connector.stopListening();
else
interrupt();
if (remotePayload == null) connector.stopListening();
else interrupt();
}
}

View File

@@ -28,7 +28,6 @@ import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -324,7 +323,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
DuplexTransportConnection conn = connect(address, uuid);
if (conn == null) return null;
// TODO: Why don't we reset the backoff here?
return connectionManager.connectionOpened(conn, false) ? conn : null;
return connectionManager.connectionOpened(conn, false) ? conn : null;
}
@Override
@@ -361,7 +360,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
@Override
public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) {
byte[] commitment, BdfList descriptor) {
if (!isRunning()) return null;
String address;
try {
@@ -427,15 +426,12 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
}
@Override
public Callable<KeyAgreementConnection> listen() {
return () -> {
DuplexTransportConnection conn = acceptConnection(ss);
if (LOG.isLoggable(INFO))
LOG.info(ID.getString() + ": Incoming connection");
// The connection limit doesn't apply to key agreement
connectionManager.connectionOpened(conn, true);
return new KeyAgreementConnection(conn, ID);
};
public KeyAgreementConnection accept() throws IOException {
DuplexTransportConnection conn = acceptConnection(ss);
if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection");
// The connection limit doesn't apply to key agreement
connectionManager.connectionOpened(conn, true);
return new KeyAgreementConnection(conn, ID);
}
@Override

View File

@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -224,7 +223,7 @@ class LanTcpPlugin extends TcpPlugin {
@Override
public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) {
byte[] commitment, BdfList descriptor) {
if (!isRunning()) return null;
InetSocketAddress remote;
try {
@@ -283,14 +282,11 @@ class LanTcpPlugin extends TcpPlugin {
}
@Override
public Callable<KeyAgreementConnection> listen() {
return () -> {
Socket s = ss.accept();
if (LOG.isLoggable(INFO))
LOG.info(ID.getString() + ": Incoming connection");
return new KeyAgreementConnection(
new TcpTransportConnection(LanTcpPlugin.this, s), ID);
};
public KeyAgreementConnection accept() throws IOException {
Socket s = ss.accept();
if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection");
return new KeyAgreementConnection(new TcpTransportConnection(
LanTcpPlugin.this, s), ID);
}
@Override

View File

@@ -297,7 +297,7 @@ abstract class TcpPlugin implements DuplexPlugin {
@Override
public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) {
byte[] commitment, BdfList descriptor) {
throw new UnsupportedOperationException();
}

View File

@@ -2,7 +2,6 @@ package org.briarproject.bramble.plugin.tcp;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.data.BdfList;
import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import org.briarproject.bramble.api.keyagreement.KeyAgreementListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.Backoff;
@@ -26,11 +25,9 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -195,9 +192,16 @@ public class LanTcpPluginTest extends BrambleTestCase {
KeyAgreementListener kal =
plugin.createKeyAgreementListener(new byte[COMMIT_LENGTH]);
assertNotNull(kal);
Callable<KeyAgreementConnection> c = kal.listen();
FutureTask<KeyAgreementConnection> f = new FutureTask<>(c);
new Thread(f).start();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean error = new AtomicBoolean(false);
new Thread(() -> {
try {
kal.accept();
latch.countDown();
} catch (IOException e) {
error.set(true);
}
}).start();
// The plugin should have bound a socket and stored the port number
BdfList descriptor = kal.getDescriptor();
assertEquals(3, descriptor.size());
@@ -213,10 +217,12 @@ public class LanTcpPluginTest extends BrambleTestCase {
InetSocketAddress socketAddr = new InetSocketAddress(addr, port);
Socket s = new Socket();
s.connect(socketAddr, 100);
assertNotNull(f.get(5, SECONDS));
// Check that the connection was accepted
assertTrue(latch.await(5, SECONDS));
assertFalse(error.get());
// Clean up
s.close();
kal.close();
// Stop the plugin
plugin.stop();
}
@@ -262,7 +268,7 @@ public class LanTcpPluginTest extends BrambleTestCase {
descriptor.add(local.getPort());
// Connect to the port
DuplexTransportConnection d = plugin.createKeyAgreementConnection(
new byte[COMMIT_LENGTH], descriptor, 5000);
new byte[COMMIT_LENGTH], descriptor);
assertNotNull(d);
// Check that the connection was accepted
assertTrue(latch.await(5, SECONDS));

View File

@@ -177,7 +177,7 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
@Override
public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) {
byte[] commitment, BdfList descriptor) {
throw new UnsupportedOperationException();
}