mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-12 18:59:06 +01:00
Fixed race condition when closing redundant sockets.
When more than one invitation socket is opened, Alice should pick which one to use and Bob should use whichever one Alice picks. This fixes a race condition where each party picked a different socket and closed the other.
This commit is contained in:
@@ -8,18 +8,24 @@ import org.briarproject.api.plugins.duplex.DuplexPlugin;
|
||||
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
|
||||
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
|
||||
import org.briarproject.api.properties.TransportProperties;
|
||||
import org.briarproject.api.system.Clock;
|
||||
import org.briarproject.util.LatchedReference;
|
||||
import org.briarproject.util.OsUtils;
|
||||
import org.briarproject.util.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
@@ -30,6 +36,7 @@ import javax.microedition.io.Connector;
|
||||
import javax.microedition.io.StreamConnection;
|
||||
import javax.microedition.io.StreamConnectionNotifier;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static javax.bluetooth.DiscoveryAgent.GIAC;
|
||||
@@ -45,7 +52,6 @@ class BluetoothPlugin implements DuplexPlugin {
|
||||
|
||||
private final Executor ioExecutor;
|
||||
private final SecureRandom secureRandom;
|
||||
private final Clock clock;
|
||||
private final Backoff backoff;
|
||||
private final DuplexPluginCallback callback;
|
||||
private final int maxLatency;
|
||||
@@ -55,11 +61,10 @@ class BluetoothPlugin implements DuplexPlugin {
|
||||
private volatile StreamConnectionNotifier socket = null;
|
||||
private volatile LocalDevice localDevice = null;
|
||||
|
||||
BluetoothPlugin(Executor ioExecutor, SecureRandom secureRandom, Clock clock,
|
||||
BluetoothPlugin(Executor ioExecutor, SecureRandom secureRandom,
|
||||
Backoff backoff, DuplexPluginCallback callback, int maxLatency) {
|
||||
this.ioExecutor = ioExecutor;
|
||||
this.secureRandom = secureRandom;
|
||||
this.clock = clock;
|
||||
this.backoff = backoff;
|
||||
this.callback = callback;
|
||||
this.maxLatency = maxLatency;
|
||||
@@ -246,7 +251,7 @@ class BluetoothPlugin implements DuplexPlugin {
|
||||
}
|
||||
|
||||
public DuplexTransportConnection createInvitationConnection(PseudoRandom r,
|
||||
long timeout) {
|
||||
long timeout, boolean alice) {
|
||||
if (!running) return null;
|
||||
// Use the invitation codes to generate the UUID
|
||||
byte[] b = r.nextBytes(UUID_BYTES);
|
||||
@@ -266,23 +271,67 @@ class BluetoothPlugin implements DuplexPlugin {
|
||||
tryToClose(ss);
|
||||
return null;
|
||||
}
|
||||
// Start the background threads
|
||||
LatchedReference<StreamConnection> socketLatch =
|
||||
new LatchedReference<StreamConnection>();
|
||||
new DiscoveryThread(socketLatch, uuid, timeout).start();
|
||||
new BluetoothListenerThread(socketLatch, ss).start();
|
||||
// Wait for an incoming or outgoing connection
|
||||
try {
|
||||
StreamConnection s = socketLatch.waitForReference(timeout);
|
||||
if (s != null) return new BluetoothTransportConnection(this, s);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warning("Interrupted while exchanging invitations");
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
// Closing the socket will terminate the listener thread
|
||||
tryToClose(ss);
|
||||
// Create the background tasks
|
||||
CompletionService<StreamConnection> complete =
|
||||
new ExecutorCompletionService<StreamConnection>(ioExecutor);
|
||||
List<Future<StreamConnection>> futures =
|
||||
new ArrayList<Future<StreamConnection>>();
|
||||
if (alice) {
|
||||
// Return the first connected socket
|
||||
futures.add(complete.submit(new ListeningTask(ss)));
|
||||
futures.add(complete.submit(new DiscoveryTask(uuid)));
|
||||
} else {
|
||||
// Return the first socket with readable data
|
||||
futures.add(complete.submit(new ReadableTask(
|
||||
new ListeningTask(ss))));
|
||||
futures.add(complete.submit(new ReadableTask(
|
||||
new DiscoveryTask(uuid))));
|
||||
}
|
||||
return null;
|
||||
StreamConnection chosen = null;
|
||||
try {
|
||||
Future<StreamConnection> f = complete.poll(timeout, MILLISECONDS);
|
||||
if (f == null) return null; // No task completed within the timeout
|
||||
chosen = f.get();
|
||||
return new BluetoothTransportConnection(this, chosen);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted while waiting for connection");
|
||||
return null;
|
||||
} catch (ExecutionException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return null;
|
||||
} finally {
|
||||
// Closing the socket will terminate the listener task
|
||||
tryToClose(ss);
|
||||
closeSockets(futures, chosen);
|
||||
}
|
||||
}
|
||||
|
||||
private void closeSockets(final List<Future<StreamConnection>> futures,
|
||||
final StreamConnection chosen) {
|
||||
ioExecutor.execute(new Runnable() {
|
||||
public void run() {
|
||||
for (Future<StreamConnection> f : futures) {
|
||||
try {
|
||||
if (f.cancel(true)) {
|
||||
LOG.info("Cancelled task");
|
||||
} else {
|
||||
StreamConnection s = f.get();
|
||||
if (s != null && s != chosen) {
|
||||
LOG.info("Closing unwanted socket");
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted while closing sockets");
|
||||
return;
|
||||
} catch (ExecutionException e) {
|
||||
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
|
||||
} catch (IOException e) {
|
||||
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void makeDeviceDiscoverable() {
|
||||
@@ -294,93 +343,74 @@ class BluetoothPlugin implements DuplexPlugin {
|
||||
}
|
||||
}
|
||||
|
||||
private class DiscoveryThread extends Thread {
|
||||
private class DiscoveryTask implements Callable<StreamConnection> {
|
||||
|
||||
private final LatchedReference<StreamConnection> socketLatch;
|
||||
private final String uuid;
|
||||
private final long timeout;
|
||||
|
||||
private DiscoveryThread(LatchedReference<StreamConnection> socketLatch,
|
||||
String uuid, long timeout) {
|
||||
this.socketLatch = socketLatch;
|
||||
private DiscoveryTask(String uuid) {
|
||||
this.uuid = uuid;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
public StreamConnection call() throws Exception {
|
||||
// Repeat discovery until we connect or get interrupted
|
||||
DiscoveryAgent discoveryAgent = localDevice.getDiscoveryAgent();
|
||||
long now = clock.currentTimeMillis();
|
||||
long end = now + timeout;
|
||||
while (now < end && running && !socketLatch.isSet()) {
|
||||
if (!discoverySemaphore.tryAcquire()) {
|
||||
LOG.info("Another device discovery is in progress");
|
||||
return;
|
||||
}
|
||||
while (true) {
|
||||
if (!discoverySemaphore.tryAcquire())
|
||||
throw new Exception("Discovery is already in progress");
|
||||
try {
|
||||
InvitationListener listener =
|
||||
new InvitationListener(discoveryAgent, uuid);
|
||||
discoveryAgent.startInquiry(GIAC, listener);
|
||||
String url = listener.waitForUrl();
|
||||
if (url == null) continue;
|
||||
StreamConnection s = connect(url);
|
||||
if (s == null) continue;
|
||||
LOG.info("Outgoing connection");
|
||||
if (!socketLatch.set(s)) {
|
||||
LOG.info("Closing redundant connection");
|
||||
tryToClose(s);
|
||||
if (url != null) {
|
||||
StreamConnection s = connect(url);
|
||||
if (s != null) {
|
||||
LOG.info("Outgoing connection");
|
||||
return s;
|
||||
}
|
||||
}
|
||||
return;
|
||||
} catch (BluetoothStateException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
return;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warning("Interrupted while waiting for URL");
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} finally {
|
||||
discoverySemaphore.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void tryToClose(StreamConnection s) {
|
||||
try {
|
||||
if (s != null) s.close();
|
||||
} catch (IOException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class BluetoothListenerThread extends Thread {
|
||||
private static class ListeningTask implements Callable<StreamConnection> {
|
||||
|
||||
private final LatchedReference<StreamConnection> socketLatch;
|
||||
private final StreamConnectionNotifier serverSocket;
|
||||
|
||||
private BluetoothListenerThread(
|
||||
LatchedReference<StreamConnection> socketLatch,
|
||||
StreamConnectionNotifier serverSocket) {
|
||||
this.socketLatch = socketLatch;
|
||||
private ListeningTask(StreamConnectionNotifier serverSocket) {
|
||||
this.serverSocket = serverSocket;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Listening for invitation connections");
|
||||
// Listen until a connection is received or the socket is closed
|
||||
try {
|
||||
StreamConnection s = serverSocket.acceptAndOpen();
|
||||
LOG.info("Incoming connection");
|
||||
if (!socketLatch.set(s)) {
|
||||
LOG.info("Closing redundant connection");
|
||||
s.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// This is expected when the socket is closed
|
||||
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
|
||||
public StreamConnection call() throws Exception {
|
||||
StreamConnection s = serverSocket.acceptAndOpen();
|
||||
LOG.info("Incoming connection");
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ReadableTask implements Callable<StreamConnection> {
|
||||
|
||||
private final Callable<StreamConnection> connectionTask;
|
||||
|
||||
private ReadableTask(Callable<StreamConnection> connectionTask) {
|
||||
this.connectionTask = connectionTask;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamConnection call() throws Exception {
|
||||
StreamConnection s = connectionTask.call();
|
||||
InputStream in = s.openInputStream();
|
||||
while (in.available() == 0) {
|
||||
LOG.info("Waiting for data");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
LOG.info("Data available");
|
||||
return s;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user