Merge branch '283-key-exchange-connections' into 'master'

Refactor key agreement connection choosing

Closes #283

See merge request akwizgran/briar!711
This commit is contained in:
akwizgran
2018-03-07 10:37:45 +00:00
14 changed files with 258 additions and 150 deletions

View File

@@ -614,7 +614,7 @@ class TorPlugin implements DuplexPlugin, EventHandler, EventListener {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@@ -2,7 +2,7 @@ package org.briarproject.bramble.api.keyagreement;
import org.briarproject.bramble.api.data.BdfList; import org.briarproject.bramble.api.data.BdfList;
import java.util.concurrent.Callable; import java.io.IOException;
/** /**
* An class for managing a particular key agreement listener. * An class for managing a particular key agreement listener.
@@ -24,11 +24,11 @@ public abstract class KeyAgreementListener {
} }
/** /**
* Starts listening for incoming connections, and returns a Callable that * Blocks until an incoming connection is received and returns it.
* will return a KeyAgreementConnection when an incoming connection is *
* received. * @throws IOException if an error occurs or {@link #close()} is called.
*/ */
public abstract Callable<KeyAgreementConnection> listen(); public abstract KeyAgreementConnection accept() throws IOException;
/** /**
* Closes the underlying server socket. * Closes the underlying server socket.

View File

@@ -36,9 +36,9 @@ public interface DuplexPlugin extends Plugin {
/** /**
* Attempts to connect to the remote peer specified in the given descriptor. * Attempts to connect to the remote peer specified in the given descriptor.
* Returns null if no connection can be established within the given time. * Returns null if no connection can be established.
*/ */
@Nullable @Nullable
DuplexTransportConnection createKeyAgreementConnection( DuplexTransportConnection createKeyAgreementConnection(
byte[] remoteCommitment, BdfList descriptor, long timeout); byte[] remoteCommitment, BdfList descriptor);
} }

View File

@@ -0,0 +1,36 @@
package org.briarproject.bramble.keyagreement;
import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
interface ConnectionChooser {
/**
* Submits a connection task to the chooser.
*/
void submit(Callable<KeyAgreementConnection> task);
/**
* Returns a connection returned by any of the tasks submitted to the
* chooser, waiting up to the given amount of time for a connection if
* necessary. Returns null if the time elapses without a connection
* becoming available.
*
* @param timeout the timeout in milliseconds
* @throws InterruptedException if the thread is interrupted while waiting
* for a connection to become available
*/
@Nullable
KeyAgreementConnection poll(long timeout) throws InterruptedException;
/**
* Stops the chooser. Any connections already returned to the chooser are
* closed unless they have been removed from the chooser by calling
* {@link #poll(long)}. Any connections subsequently returned to the
* chooser will also be closed.
*/
void stop();
}

View File

@@ -0,0 +1,112 @@
package org.briarproject.bramble.keyagreement;
import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.system.Clock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
@NotNullByDefault
@ThreadSafe
class ConnectionChooserImpl implements ConnectionChooser {
private static final Logger LOG =
Logger.getLogger(ConnectionChooserImpl.class.getName());
private final Clock clock;
private final Executor ioExecutor;
private final Object lock = new Object();
// The following are locking: lock
private boolean stopped = false;
private final Queue<KeyAgreementConnection> results = new LinkedList<>();
@Inject
ConnectionChooserImpl(Clock clock, @IoExecutor Executor ioExecutor) {
this.clock = clock;
this.ioExecutor = ioExecutor;
}
@Override
public void submit(Callable<KeyAgreementConnection> task) {
ioExecutor.execute(() -> {
try {
KeyAgreementConnection c = task.call();
if (c != null) addResult(c);
} catch (Exception e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
});
}
@Nullable
@Override
public KeyAgreementConnection poll(long timeout)
throws InterruptedException {
long now = clock.currentTimeMillis();
long end = now + timeout;
synchronized (lock) {
while (!stopped && results.isEmpty() && now < end) {
lock.wait(end - now);
now = clock.currentTimeMillis();
}
return results.poll();
}
}
@Override
public void stop() {
List<KeyAgreementConnection> unused;
synchronized (lock) {
unused = new ArrayList<>(results);
results.clear();
stopped = true;
lock.notifyAll();
}
if (LOG.isLoggable(INFO))
LOG.info("Closing " + unused.size() + " unused connections");
for (KeyAgreementConnection c : unused) tryToClose(c.getConnection());
}
private void addResult(KeyAgreementConnection c) {
if (LOG.isLoggable(INFO))
LOG.info("Got connection for " + c.getTransportId());
boolean close = false;
synchronized (lock) {
if (stopped) {
close = true;
} else {
results.add(c);
lock.notifyAll();
}
}
if (close) {
LOG.info("Already stopped");
tryToClose(c.getConnection());
}
}
private void tryToClose(DuplexTransportConnection conn) {
try {
conn.getReader().dispose(false, true);
conn.getWriter().dispose(false);
} catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
}
}

View File

@@ -13,23 +13,19 @@ import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin; import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.system.Clock;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.keyagreement.KeyAgreementConstants.CONNECTION_TIMEOUT; import static org.briarproject.bramble.api.keyagreement.KeyAgreementConstants.CONNECTION_TIMEOUT;
@@ -45,29 +41,27 @@ class KeyAgreementConnector {
Logger.getLogger(KeyAgreementConnector.class.getName()); Logger.getLogger(KeyAgreementConnector.class.getName());
private final Callbacks callbacks; private final Callbacks callbacks;
private final Clock clock;
private final KeyAgreementCrypto keyAgreementCrypto; private final KeyAgreementCrypto keyAgreementCrypto;
private final PluginManager pluginManager; private final PluginManager pluginManager;
private final CompletionService<KeyAgreementConnection> connect; private final ConnectionChooser connectionChooser;
private final List<KeyAgreementListener> listeners = new ArrayList<>(); private final List<KeyAgreementListener> listeners =
private final List<Future<KeyAgreementConnection>> pending = new CopyOnWriteArrayList<>();
new ArrayList<>(); private final CountDownLatch aliceLatch = new CountDownLatch(1);
private final AtomicBoolean waitingSent = new AtomicBoolean(false);
private volatile boolean connecting = false; private volatile boolean alice = false, stopped = false;
private volatile boolean alice = false;
KeyAgreementConnector(Callbacks callbacks, Clock clock, KeyAgreementConnector(Callbacks callbacks,
KeyAgreementCrypto keyAgreementCrypto, PluginManager pluginManager, KeyAgreementCrypto keyAgreementCrypto, PluginManager pluginManager,
Executor ioExecutor) { ConnectionChooser connectionChooser) {
this.callbacks = callbacks; this.callbacks = callbacks;
this.clock = clock;
this.keyAgreementCrypto = keyAgreementCrypto; this.keyAgreementCrypto = keyAgreementCrypto;
this.pluginManager = pluginManager; this.pluginManager = pluginManager;
connect = new ExecutorCompletionService<>(ioExecutor); this.connectionChooser = connectionChooser;
} }
public Payload listen(KeyPair localKeyPair) { Payload listen(KeyPair localKeyPair) {
LOG.info("Starting BQP listeners"); LOG.info("Starting BQP listeners");
// Derive commitment // Derive commitment
byte[] commitment = keyAgreementCrypto.deriveKeyCommitment( byte[] commitment = keyAgreementCrypto.deriveKeyCommitment(
@@ -80,8 +74,9 @@ class KeyAgreementConnector {
if (l != null) { if (l != null) {
TransportId id = plugin.getId(); TransportId id = plugin.getId();
descriptors.add(new TransportDescriptor(id, l.getDescriptor())); descriptors.add(new TransportDescriptor(id, l.getDescriptor()));
pending.add(connect.submit(new ReadableTask(l.listen()))); if (LOG.isLoggable(INFO)) LOG.info("Listening via " + id);
listeners.add(l); listeners.add(l);
connectionChooser.submit(new ReadableTask(l::accept));
} }
} }
return new Payload(commitment, descriptors); return new Payload(commitment, descriptors);
@@ -89,125 +84,92 @@ class KeyAgreementConnector {
void stopListening() { void stopListening() {
LOG.info("Stopping BQP listeners"); LOG.info("Stopping BQP listeners");
for (KeyAgreementListener l : listeners) { stopped = true;
l.close(); aliceLatch.countDown();
} for (KeyAgreementListener l : listeners) l.close();
listeners.clear(); connectionChooser.stop();
} }
@Nullable @Nullable
public KeyAgreementTransport connect(Payload remotePayload, public KeyAgreementTransport connect(Payload remotePayload, boolean alice) {
boolean alice) { // Let the ReadableTasks know if we are Alice
// Let the listeners know if we are Alice
this.connecting = true;
this.alice = alice; this.alice = alice;
long end = clock.currentTimeMillis() + CONNECTION_TIMEOUT; aliceLatch.countDown();
// Start connecting over supported transports // Start connecting over supported transports
LOG.info("Starting outgoing BQP connections"); if (LOG.isLoggable(INFO)) {
LOG.info("Starting outgoing BQP connections as "
+ (alice ? "Alice" : "Bob"));
}
for (TransportDescriptor d : remotePayload.getTransportDescriptors()) { for (TransportDescriptor d : remotePayload.getTransportDescriptors()) {
Plugin p = pluginManager.getPlugin(d.getId()); Plugin p = pluginManager.getPlugin(d.getId());
if (p instanceof DuplexPlugin) { if (p instanceof DuplexPlugin) {
if (LOG.isLoggable(INFO))
LOG.info("Connecting via " + d.getId());
DuplexPlugin plugin = (DuplexPlugin) p; DuplexPlugin plugin = (DuplexPlugin) p;
pending.add(connect.submit(new ReadableTask( byte[] commitment = remotePayload.getCommitment();
new ConnectorTask(plugin, remotePayload.getCommitment(), BdfList descriptor = d.getDescriptor();
d.getDescriptor(), end)))); connectionChooser.submit(new ReadableTask(
new ConnectorTask(plugin, commitment, descriptor)));
} }
} }
// Get chosen connection // Get chosen connection
KeyAgreementConnection chosen = null;
try { try {
long now = clock.currentTimeMillis(); KeyAgreementConnection chosen =
Future<KeyAgreementConnection> f = connectionChooser.poll(CONNECTION_TIMEOUT);
connect.poll(end - now, MILLISECONDS); if (chosen == null) return null;
if (f == null)
return null; // No task completed within the timeout.
chosen = f.get();
return new KeyAgreementTransport(chosen); return new KeyAgreementTransport(chosen);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Interrupted while waiting for connection"); LOG.info("Interrupted while waiting for connection");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return null; return null;
} catch (ExecutionException | IOException e) { } catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null; return null;
} finally { } finally {
stopListening(); stopListening();
// Close all other connections
closePending(chosen);
} }
} }
private void closePending(@Nullable KeyAgreementConnection chosen) { private void waitingForAlice() {
for (Future<KeyAgreementConnection> f : pending) { if (!waitingSent.getAndSet(true)) callbacks.connectionWaiting();
try {
if (f.cancel(true)) {
LOG.info("Cancelled task");
} else if (!f.isCancelled()) {
KeyAgreementConnection c = f.get();
if (c != null && c != chosen)
tryToClose(c.getConnection(), false);
}
} catch (InterruptedException e) {
LOG.info("Interrupted while closing sockets");
Thread.currentThread().interrupt();
return;
} catch (ExecutionException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
}
}
private void tryToClose(DuplexTransportConnection conn, boolean exception) {
try {
if (LOG.isLoggable(INFO))
LOG.info("Closing connection, exception: " + exception);
conn.getReader().dispose(exception, true);
conn.getWriter().dispose(exception);
} catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
} }
private class ConnectorTask implements Callable<KeyAgreementConnection> { private class ConnectorTask implements Callable<KeyAgreementConnection> {
private final byte[] commitment; private final byte[] commitment;
private final BdfList descriptor; private final BdfList descriptor;
private final long end;
private final DuplexPlugin plugin; private final DuplexPlugin plugin;
private ConnectorTask(DuplexPlugin plugin, byte[] commitment, private ConnectorTask(DuplexPlugin plugin, byte[] commitment,
BdfList descriptor, long end) { BdfList descriptor) {
this.plugin = plugin; this.plugin = plugin;
this.commitment = commitment; this.commitment = commitment;
this.descriptor = descriptor; this.descriptor = descriptor;
this.end = end;
} }
@Nullable
@Override @Override
public KeyAgreementConnection call() throws Exception { public KeyAgreementConnection call() throws Exception {
// Repeat attempts until we connect, get interrupted, or time out // Repeat attempts until we connect, get stopped, or get interrupted
while (true) { while (!stopped) {
long now = clock.currentTimeMillis();
if (now > end) throw new IOException();
DuplexTransportConnection conn = DuplexTransportConnection conn =
plugin.createKeyAgreementConnection(commitment, plugin.createKeyAgreementConnection(commitment,
descriptor, end - now); descriptor);
if (conn != null) { if (conn != null) {
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO))
LOG.info(plugin.getId().getString() + LOG.info(plugin.getId() + ": Outgoing connection");
": Outgoing connection");
return new KeyAgreementConnection(conn, plugin.getId()); return new KeyAgreementConnection(conn, plugin.getId());
} }
// Wait 2s before retry (to circumvent transient failures) // Wait 2s before retry (to circumvent transient failures)
Thread.sleep(2000); Thread.sleep(2000);
} }
return null;
} }
} }
private class ReadableTask private class ReadableTask implements Callable<KeyAgreementConnection> {
implements Callable<KeyAgreementConnection> {
private final Callable<KeyAgreementConnection> connectionTask; private final Callable<KeyAgreementConnection> connectionTask;
@@ -215,24 +177,23 @@ class KeyAgreementConnector {
this.connectionTask = connectionTask; this.connectionTask = connectionTask;
} }
@Nullable
@Override @Override
public KeyAgreementConnection call() throws Exception { public KeyAgreementConnection call() throws Exception {
KeyAgreementConnection c = connectionTask.call(); KeyAgreementConnection c = connectionTask.call();
if (c == null) return null;
aliceLatch.await();
if (alice || stopped) return c;
// Bob waits here for Alice to scan his QR code, determine her
// role, and send her key
InputStream in = c.getConnection().getReader().getInputStream(); InputStream in = c.getConnection().getReader().getInputStream();
boolean waitingSent = false; while (!stopped && in.available() == 0) {
while (!alice && in.available() == 0) { if (LOG.isLoggable(INFO))
if (!waitingSent && connecting && !alice) { LOG.info(c.getTransportId() + ": Waiting for data");
// Bob waits here until Alice obtains his payload. waitingForAlice();
callbacks.connectionWaiting(); Thread.sleep(500);
waitingSent = true;
}
if (LOG.isLoggable(INFO)) {
LOG.info(c.getTransportId().getString() +
": Waiting for connection");
}
Thread.sleep(1000);
} }
if (!alice && LOG.isLoggable(INFO)) if (!stopped && LOG.isLoggable(INFO))
LOG.info(c.getTransportId().getString() + ": Data available"); LOG.info(c.getTransportId().getString() + ": Data available");
return c; return c;
} }

View File

@@ -27,4 +27,10 @@ public class KeyAgreementModule {
PayloadParser providePayloadParser(BdfReaderFactory bdfReaderFactory) { PayloadParser providePayloadParser(BdfReaderFactory bdfReaderFactory) {
return new PayloadParserImpl(bdfReaderFactory); return new PayloadParserImpl(bdfReaderFactory);
} }
@Provides
ConnectionChooser provideConnectionChooser(
ConnectionChooserImpl connectionChooser) {
return connectionChooser;
}
} }

View File

@@ -99,7 +99,8 @@ class KeyAgreementProtocol {
PublicKey theirPublicKey; PublicKey theirPublicKey;
if (alice) { if (alice) {
sendKey(); sendKey();
// Alice waits here until Bob obtains her payload. // Alice waits here for Bob to scan her QR code, determine his
// role, receive her key and respond with his key
callbacks.connectionWaiting(); callbacks.connectionWaiting();
theirPublicKey = receiveKey(); theirPublicKey = receiveKey();
} else { } else {

View File

@@ -15,14 +15,11 @@ import org.briarproject.bramble.api.keyagreement.event.KeyAgreementFinishedEvent
import org.briarproject.bramble.api.keyagreement.event.KeyAgreementListeningEvent; import org.briarproject.bramble.api.keyagreement.event.KeyAgreementListeningEvent;
import org.briarproject.bramble.api.keyagreement.event.KeyAgreementStartedEvent; import org.briarproject.bramble.api.keyagreement.event.KeyAgreementStartedEvent;
import org.briarproject.bramble.api.keyagreement.event.KeyAgreementWaitingEvent; import org.briarproject.bramble.api.keyagreement.event.KeyAgreementWaitingEvent;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.nullsafety.MethodsNotNullByDefault; import org.briarproject.bramble.api.nullsafety.MethodsNotNullByDefault;
import org.briarproject.bramble.api.nullsafety.ParametersNotNullByDefault; import org.briarproject.bramble.api.nullsafety.ParametersNotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.system.Clock;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.inject.Inject; import javax.inject.Inject;
@@ -31,9 +28,8 @@ import static java.util.logging.Level.WARNING;
@MethodsNotNullByDefault @MethodsNotNullByDefault
@ParametersNotNullByDefault @ParametersNotNullByDefault
class KeyAgreementTaskImpl extends Thread implements class KeyAgreementTaskImpl extends Thread implements KeyAgreementTask,
KeyAgreementTask, KeyAgreementConnector.Callbacks, KeyAgreementProtocol.Callbacks, KeyAgreementConnector.Callbacks {
KeyAgreementProtocol.Callbacks {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(KeyAgreementTaskImpl.class.getName()); Logger.getLogger(KeyAgreementTaskImpl.class.getName());
@@ -49,17 +45,17 @@ class KeyAgreementTaskImpl extends Thread implements
private Payload remotePayload; private Payload remotePayload;
@Inject @Inject
KeyAgreementTaskImpl(Clock clock, CryptoComponent crypto, KeyAgreementTaskImpl(CryptoComponent crypto,
KeyAgreementCrypto keyAgreementCrypto, EventBus eventBus, KeyAgreementCrypto keyAgreementCrypto, EventBus eventBus,
PayloadEncoder payloadEncoder, PluginManager pluginManager, PayloadEncoder payloadEncoder, PluginManager pluginManager,
@IoExecutor Executor ioExecutor) { ConnectionChooser connectionChooser) {
this.crypto = crypto; this.crypto = crypto;
this.keyAgreementCrypto = keyAgreementCrypto; this.keyAgreementCrypto = keyAgreementCrypto;
this.eventBus = eventBus; this.eventBus = eventBus;
this.payloadEncoder = payloadEncoder; this.payloadEncoder = payloadEncoder;
localKeyPair = crypto.generateAgreementKeyPair(); localKeyPair = crypto.generateAgreementKeyPair();
connector = new KeyAgreementConnector(this, clock, keyAgreementCrypto, connector = new KeyAgreementConnector(this, keyAgreementCrypto,
pluginManager, ioExecutor); pluginManager, connectionChooser);
} }
@Override @Override
@@ -73,10 +69,8 @@ class KeyAgreementTaskImpl extends Thread implements
@Override @Override
public synchronized void stopListening() { public synchronized void stopListening() {
if (localPayload != null) { if (localPayload != null) {
if (remotePayload == null) if (remotePayload == null) connector.stopListening();
connector.stopListening(); else interrupt();
else
interrupt();
} }
} }

View File

@@ -28,7 +28,6 @@ import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -343,7 +342,7 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
if (!isRunning()) return null; if (!isRunning()) return null;
String address; String address;
try { try {
@@ -406,13 +405,10 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
} }
@Override @Override
public Callable<KeyAgreementConnection> listen() { public KeyAgreementConnection accept() throws IOException {
return () -> { DuplexTransportConnection conn = acceptConnection(ss);
DuplexTransportConnection conn = acceptConnection(ss); if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection");
if (LOG.isLoggable(INFO)) return new KeyAgreementConnection(conn, ID);
LOG.info(ID.getString() + ": Incoming connection");
return new KeyAgreementConnection(conn, ID);
};
} }
@Override @Override

View File

@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -224,7 +223,7 @@ class LanTcpPlugin extends TcpPlugin {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
if (!isRunning()) return null; if (!isRunning()) return null;
InetSocketAddress remote; InetSocketAddress remote;
try { try {
@@ -283,14 +282,11 @@ class LanTcpPlugin extends TcpPlugin {
} }
@Override @Override
public Callable<KeyAgreementConnection> listen() { public KeyAgreementConnection accept() throws IOException {
return () -> { Socket s = ss.accept();
Socket s = ss.accept(); if (LOG.isLoggable(INFO)) LOG.info(ID + ": Incoming connection");
if (LOG.isLoggable(INFO)) return new KeyAgreementConnection(new TcpTransportConnection(
LOG.info(ID.getString() + ": Incoming connection"); LanTcpPlugin.this, s), ID);
return new KeyAgreementConnection(
new TcpTransportConnection(LanTcpPlugin.this, s), ID);
};
} }
@Override @Override

View File

@@ -297,7 +297,7 @@ abstract class TcpPlugin implements DuplexPlugin {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@@ -2,7 +2,6 @@ package org.briarproject.bramble.plugin.tcp;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.data.BdfList; import org.briarproject.bramble.api.data.BdfList;
import org.briarproject.bramble.api.keyagreement.KeyAgreementConnection;
import org.briarproject.bramble.api.keyagreement.KeyAgreementListener; import org.briarproject.bramble.api.keyagreement.KeyAgreementListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.Backoff; import org.briarproject.bramble.api.plugin.Backoff;
@@ -26,11 +25,9 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
@@ -195,9 +192,16 @@ public class LanTcpPluginTest extends BrambleTestCase {
KeyAgreementListener kal = KeyAgreementListener kal =
plugin.createKeyAgreementListener(new byte[COMMIT_LENGTH]); plugin.createKeyAgreementListener(new byte[COMMIT_LENGTH]);
assertNotNull(kal); assertNotNull(kal);
Callable<KeyAgreementConnection> c = kal.listen(); CountDownLatch latch = new CountDownLatch(1);
FutureTask<KeyAgreementConnection> f = new FutureTask<>(c); AtomicBoolean error = new AtomicBoolean(false);
new Thread(f).start(); new Thread(() -> {
try {
kal.accept();
latch.countDown();
} catch (IOException e) {
error.set(true);
}
}).start();
// The plugin should have bound a socket and stored the port number // The plugin should have bound a socket and stored the port number
BdfList descriptor = kal.getDescriptor(); BdfList descriptor = kal.getDescriptor();
assertEquals(3, descriptor.size()); assertEquals(3, descriptor.size());
@@ -213,10 +217,12 @@ public class LanTcpPluginTest extends BrambleTestCase {
InetSocketAddress socketAddr = new InetSocketAddress(addr, port); InetSocketAddress socketAddr = new InetSocketAddress(addr, port);
Socket s = new Socket(); Socket s = new Socket();
s.connect(socketAddr, 100); s.connect(socketAddr, 100);
assertNotNull(f.get(5, SECONDS)); // Check that the connection was accepted
assertTrue(latch.await(5, SECONDS));
assertFalse(error.get());
// Clean up
s.close(); s.close();
kal.close(); kal.close();
// Stop the plugin
plugin.stop(); plugin.stop();
} }
@@ -262,7 +268,7 @@ public class LanTcpPluginTest extends BrambleTestCase {
descriptor.add(local.getPort()); descriptor.add(local.getPort());
// Connect to the port // Connect to the port
DuplexTransportConnection d = plugin.createKeyAgreementConnection( DuplexTransportConnection d = plugin.createKeyAgreementConnection(
new byte[COMMIT_LENGTH], descriptor, 5000); new byte[COMMIT_LENGTH], descriptor);
assertNotNull(d); assertNotNull(d);
// Check that the connection was accepted // Check that the connection was accepted
assertTrue(latch.await(5, SECONDS)); assertTrue(latch.await(5, SECONDS));

View File

@@ -177,7 +177,7 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
@Override @Override
public DuplexTransportConnection createKeyAgreementConnection( public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, BdfList descriptor, long timeout) { byte[] commitment, BdfList descriptor) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }