Debugging Bluetooth threading issues.

This commit is contained in:
akwizgran
2011-12-09 22:20:32 +00:00
parent 4671b50b37
commit 5ba5887565
17 changed files with 59 additions and 57 deletions

View File

@@ -1,9 +1,9 @@
package net.sf.briar.api.plugins; package net.sf.briar.api.plugins;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
public interface BatchPluginFactory { public interface BatchPluginFactory {
BatchPlugin createPlugin(ScheduledExecutorService pluginExecutor, BatchPlugin createPlugin(Executor pluginExecutor,
BatchPluginCallback callback); BatchPluginCallback callback);
} }

View File

@@ -1,9 +1,9 @@
package net.sf.briar.api.plugins; package net.sf.briar.api.plugins;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
public interface StreamPluginFactory { public interface StreamPluginFactory {
StreamPlugin createPlugin(ScheduledExecutorService pluginExecutor, StreamPlugin createPlugin(Executor pluginExecutor,
StreamPluginCallback callback); StreamPluginCallback callback);
} }

View File

@@ -7,7 +7,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ExecutorService;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -51,7 +51,7 @@ class PluginManagerImpl implements PluginManager {
"net.sf.briar.plugins.socket.SimpleSocketPluginFactory" "net.sf.briar.plugins.socket.SimpleSocketPluginFactory"
}; };
private final ScheduledExecutorService pluginExecutor; private final ExecutorService pluginExecutor;
private final DatabaseComponent db; private final DatabaseComponent db;
private final Poller poller; private final Poller poller;
private final ConnectionDispatcher dispatcher; private final ConnectionDispatcher dispatcher;
@@ -60,7 +60,7 @@ class PluginManagerImpl implements PluginManager {
private final List<StreamPlugin> streamPlugins; // Locking: this private final List<StreamPlugin> streamPlugins; // Locking: this
@Inject @Inject
PluginManagerImpl(@PluginExecutor ScheduledExecutorService pluginExecutor, PluginManagerImpl(@PluginExecutor ExecutorService pluginExecutor,
DatabaseComponent db, Poller poller, DatabaseComponent db, Poller poller,
ConnectionDispatcher dispatcher, UiCallback uiCallback) { ConnectionDispatcher dispatcher, UiCallback uiCallback) {
this.pluginExecutor = pluginExecutor; this.pluginExecutor = pluginExecutor;

View File

@@ -1,7 +1,7 @@
package net.sf.briar.plugins; package net.sf.briar.plugins;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import net.sf.briar.api.plugins.PluginExecutor; import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.PluginManager; import net.sf.briar.api.plugins.PluginManager;
@@ -13,9 +13,9 @@ public class PluginsModule extends AbstractModule {
@Override @Override
protected void configure() { protected void configure() {
bind(ScheduledExecutorService.class).annotatedWith( bind(ExecutorService.class).annotatedWith(
PluginExecutor.class).toInstance( PluginExecutor.class).toInstance(
Executors.newScheduledThreadPool(1)); Executors.newCachedThreadPool());
bind(PluginManager.class).to( bind(PluginManager.class).to(
PluginManagerImpl.class).in(Singleton.class); PluginManagerImpl.class).in(Singleton.class);
bind(Poller.class).to(PollerImpl.class); bind(Poller.class).to(PollerImpl.class);

View File

@@ -4,10 +4,11 @@ import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -41,23 +42,25 @@ class BluetoothPlugin implements StreamPlugin {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(BluetoothPlugin.class.getName()); Logger.getLogger(BluetoothPlugin.class.getName());
private final ScheduledExecutorService pluginExecutor; private final Executor pluginExecutor;
private final StreamPluginCallback callback; private final StreamPluginCallback callback;
private final long pollingInterval; private final long pollingInterval;
private final Object discoveryLock = new Object(); private final Object discoveryLock = new Object();
private final ScheduledExecutorService scheduler;
// Locking: this // Locking: this
private final Collection<StreamConnectionNotifier> invitationSockets; private final Map<StreamConnectionNotifier, ScheduledFuture<?>> sockets;
private boolean running = false; // Locking: this private boolean running = false; // Locking: this
private LocalDevice localDevice = null; // Locking: this private LocalDevice localDevice = null; // Locking: this
private StreamConnectionNotifier socket = null; // Locking: this private StreamConnectionNotifier socket = null; // Locking: this
BluetoothPlugin(@PluginExecutor ScheduledExecutorService pluginExecutor, BluetoothPlugin(@PluginExecutor Executor pluginExecutor,
StreamPluginCallback callback, long pollingInterval) { StreamPluginCallback callback, long pollingInterval) {
this.pluginExecutor = pluginExecutor; this.pluginExecutor = pluginExecutor;
this.callback = callback; this.callback = callback;
this.pollingInterval = pollingInterval; this.pollingInterval = pollingInterval;
invitationSockets = new HashSet<StreamConnectionNotifier>(); scheduler = Executors.newScheduledThreadPool(0);
sockets = new HashMap<StreamConnectionNotifier, ScheduledFuture<?>>();
} }
public TransportId getId() { public TransportId getId() {
@@ -173,8 +176,11 @@ class BluetoothPlugin implements StreamPlugin {
public synchronized void stop() { public synchronized void stop() {
running = false; running = false;
localDevice = null; localDevice = null;
for(StreamConnectionNotifier scn : invitationSockets) tryToClose(scn); for(Entry<StreamConnectionNotifier, ScheduledFuture<?>> e
invitationSockets.clear(); : sockets.entrySet()) {
if(e.getValue().cancel(false)) tryToClose(e.getKey());
}
sockets.clear();
if(socket != null) { if(socket != null) {
tryToClose(socket); tryToClose(socket);
socket = null; socket = null;
@@ -383,31 +389,31 @@ class BluetoothPlugin implements StreamPlugin {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
return; return;
} }
synchronized(this) {
if(!running) {
tryToClose(scn);
return;
}
invitationSockets.add(scn);
}
// Close the socket when the invitation times out // Close the socket when the invitation times out
Runnable close = new Runnable() { Runnable close = new Runnable() {
public void run() { public void run() {
synchronized(this) { synchronized(this) {
invitationSockets.remove(scn); sockets.remove(scn);
} }
tryToClose(scn); tryToClose(scn);
} }
}; };
ScheduledFuture<?> future = pluginExecutor.schedule(close, ScheduledFuture<?> future = scheduler.schedule(close,
c.getTimeout(), TimeUnit.MILLISECONDS); c.getTimeout(), TimeUnit.MILLISECONDS);
synchronized(this) {
if(!running) {
if(future.cancel(false)) tryToClose(scn);
return;
}
sockets.put(scn, future);
}
// Try to accept a connection // Try to accept a connection
try { try {
StreamConnection s = scn.acceptAndOpen(); StreamConnection s = scn.acceptAndOpen();
// Close the socket and return the connection // Close the socket and return the connection
if(future.cancel(false)) { if(future.cancel(false)) {
synchronized(this) { synchronized(this) {
invitationSockets.remove(scn); sockets.remove(scn);
} }
tryToClose(scn); tryToClose(scn);
} }

View File

@@ -1,6 +1,6 @@
package net.sf.briar.plugins.bluetooth; package net.sf.briar.plugins.bluetooth;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import net.sf.briar.api.plugins.PluginExecutor; import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.StreamPlugin; import net.sf.briar.api.plugins.StreamPlugin;
@@ -11,8 +11,7 @@ public class BluetoothPluginFactory implements StreamPluginFactory {
private static final long POLLING_INTERVAL = 3L * 60L * 1000L; // 3 mins private static final long POLLING_INTERVAL = 3L * 60L * 1000L; // 3 mins
public StreamPlugin createPlugin( public StreamPlugin createPlugin(@PluginExecutor Executor pluginExecutor,
@PluginExecutor ScheduledExecutorService pluginExecutor,
StreamPluginCallback callback) { StreamPluginCallback callback) {
return new BluetoothPlugin(pluginExecutor, callback, POLLING_INTERVAL); return new BluetoothPlugin(pluginExecutor, callback, POLLING_INTERVAL);
} }

View File

@@ -1,6 +1,6 @@
package net.sf.briar.plugins.file; package net.sf.briar.plugins.file;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import net.sf.briar.api.plugins.BatchPlugin; import net.sf.briar.api.plugins.BatchPlugin;
import net.sf.briar.api.plugins.BatchPluginCallback; import net.sf.briar.api.plugins.BatchPluginCallback;
@@ -12,8 +12,7 @@ public class RemovableDrivePluginFactory implements BatchPluginFactory {
private static final long POLLING_INTERVAL = 10L * 1000L; // 10 seconds private static final long POLLING_INTERVAL = 10L * 1000L; // 10 seconds
public BatchPlugin createPlugin( public BatchPlugin createPlugin(@PluginExecutor Executor pluginExecutor,
@PluginExecutor ScheduledExecutorService pluginExecutor,
BatchPluginCallback callback) { BatchPluginCallback callback) {
RemovableDriveFinder finder; RemovableDriveFinder finder;
RemovableDriveMonitor monitor; RemovableDriveMonitor monitor;

View File

@@ -10,7 +10,7 @@ import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -25,7 +25,7 @@ class LanSocketPlugin extends SimpleSocketPlugin {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(LanSocketPlugin.class.getName()); Logger.getLogger(LanSocketPlugin.class.getName());
LanSocketPlugin(@PluginExecutor ScheduledExecutorService pluginExecutor, LanSocketPlugin(@PluginExecutor Executor pluginExecutor,
StreamPluginCallback callback, long pollingInterval) { StreamPluginCallback callback, long pollingInterval) {
super(pluginExecutor, callback, pollingInterval); super(pluginExecutor, callback, pollingInterval);
} }

View File

@@ -9,7 +9,7 @@ import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -31,7 +31,7 @@ class SimpleSocketPlugin extends SocketPlugin {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(SimpleSocketPlugin.class.getName()); Logger.getLogger(SimpleSocketPlugin.class.getName());
SimpleSocketPlugin(@PluginExecutor ScheduledExecutorService pluginExecutor, SimpleSocketPlugin(@PluginExecutor Executor pluginExecutor,
StreamPluginCallback callback, long pollingInterval) { StreamPluginCallback callback, long pollingInterval) {
super(pluginExecutor, callback, pollingInterval); super(pluginExecutor, callback, pollingInterval);
} }

View File

@@ -1,6 +1,6 @@
package net.sf.briar.plugins.socket; package net.sf.briar.plugins.socket;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import net.sf.briar.api.plugins.PluginExecutor; import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.StreamPlugin; import net.sf.briar.api.plugins.StreamPlugin;
@@ -11,8 +11,7 @@ public class SimpleSocketPluginFactory implements StreamPluginFactory {
private static final long POLLING_INTERVAL = 5L * 60L * 1000L; // 5 mins private static final long POLLING_INTERVAL = 5L * 60L * 1000L; // 5 mins
public StreamPlugin createPlugin( public StreamPlugin createPlugin(@PluginExecutor Executor pluginExecutor,
@PluginExecutor ScheduledExecutorService pluginExecutor,
StreamPluginCallback callback) { StreamPluginCallback callback) {
return new SimpleSocketPlugin(pluginExecutor, callback, return new SimpleSocketPlugin(pluginExecutor, callback,
POLLING_INTERVAL); POLLING_INTERVAL);

View File

@@ -6,7 +6,7 @@ import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -22,7 +22,7 @@ abstract class SocketPlugin implements StreamPlugin {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(SocketPlugin.class.getName()); Logger.getLogger(SocketPlugin.class.getName());
protected final ScheduledExecutorService pluginExecutor; protected final Executor pluginExecutor;
protected final StreamPluginCallback callback; protected final StreamPluginCallback callback;
private final long pollingInterval; private final long pollingInterval;
@@ -37,8 +37,7 @@ abstract class SocketPlugin implements StreamPlugin {
protected abstract SocketAddress getLocalSocketAddress(); protected abstract SocketAddress getLocalSocketAddress();
protected abstract SocketAddress getRemoteSocketAddress(ContactId c); protected abstract SocketAddress getRemoteSocketAddress(ContactId c);
protected SocketPlugin( protected SocketPlugin(@PluginExecutor Executor pluginExecutor,
@PluginExecutor ScheduledExecutorService pluginExecutor,
StreamPluginCallback callback, long pollingInterval) { StreamPluginCallback callback, long pollingInterval) {
this.pluginExecutor = pluginExecutor; this.pluginExecutor = pluginExecutor;
this.callback = callback; this.callback = callback;

View File

@@ -1,8 +1,8 @@
package net.sf.briar.plugins; package net.sf.briar.plugins;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import net.sf.briar.BriarTestCase; import net.sf.briar.BriarTestCase;
@@ -43,7 +43,7 @@ public class PluginManagerImplTest extends BriarTestCase {
with(any(TransportProperties.class))); with(any(TransportProperties.class)));
oneOf(poller).stop(); oneOf(poller).stop();
}}); }});
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); ExecutorService executor = Executors.newCachedThreadPool();
PluginManagerImpl p = new PluginManagerImpl(executor, db, poller, PluginManagerImpl p = new PluginManagerImpl(executor, db, poller,
dispatcher, uiCallback); dispatcher, uiCallback);
// We expect either 2 or 3 plugins to be started, depending on whether // We expect either 2 or 3 plugins to be started, depending on whether

View File

@@ -3,7 +3,7 @@ package net.sf.briar.plugins.bluetooth;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import net.sf.briar.api.ContactId; import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportConfig; import net.sf.briar.api.TransportConfig;
@@ -24,7 +24,7 @@ public class BluetoothClientTest extends StreamClientTest {
// Create the plugin // Create the plugin
callback = new ClientCallback(new TransportConfig(), callback = new ClientCallback(new TransportConfig(),
new TransportProperties(), remote); new TransportProperties(), remote);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Executor executor = Executors.newCachedThreadPool();
plugin = new BluetoothPlugin(executor, callback, 0L); plugin = new BluetoothPlugin(executor, callback, 0L);
} }

View File

@@ -2,7 +2,7 @@ package net.sf.briar.plugins.bluetooth;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import net.sf.briar.api.TransportConfig; import net.sf.briar.api.TransportConfig;
import net.sf.briar.api.TransportProperties; import net.sf.briar.api.TransportProperties;
@@ -19,7 +19,7 @@ public class BluetoothServerTest extends StreamServerTest {
// Create the plugin // Create the plugin
callback = new ServerCallback(new TransportConfig(), local, callback = new ServerCallback(new TransportConfig(), local,
Collections.singletonMap(contactId, new TransportProperties())); Collections.singletonMap(contactId, new TransportProperties()));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Executor executor = Executors.newCachedThreadPool();
plugin = new BluetoothPlugin(executor, callback, 0L); plugin = new BluetoothPlugin(executor, callback, 0L);
} }

View File

@@ -3,7 +3,7 @@ package net.sf.briar.plugins.socket;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import net.sf.briar.api.ContactId; import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportConfig; import net.sf.briar.api.TransportConfig;
@@ -24,7 +24,7 @@ public class LanSocketClientTest extends StreamClientTest {
// Create the plugin // Create the plugin
callback = new ClientCallback(new TransportConfig(), callback = new ClientCallback(new TransportConfig(),
new TransportProperties(), remote); new TransportProperties(), remote);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Executor executor = Executors.newCachedThreadPool();
plugin = new LanSocketPlugin(executor, callback, 0L); plugin = new LanSocketPlugin(executor, callback, 0L);
} }

View File

@@ -2,7 +2,7 @@ package net.sf.briar.plugins.socket;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import net.sf.briar.api.TransportConfig; import net.sf.briar.api.TransportConfig;
import net.sf.briar.api.TransportProperties; import net.sf.briar.api.TransportProperties;
@@ -16,7 +16,7 @@ public class LanSocketServerTest extends StreamServerTest {
callback = new ServerCallback(new TransportConfig(), callback = new ServerCallback(new TransportConfig(),
new TransportProperties(), new TransportProperties(),
Collections.singletonMap(contactId, new TransportProperties())); Collections.singletonMap(contactId, new TransportProperties()));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Executor executor = Executors.newCachedThreadPool();
plugin = new LanSocketPlugin(executor, callback, 0L); plugin = new LanSocketPlugin(executor, callback, 0L);
} }

View File

@@ -8,7 +8,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,7 +30,7 @@ public class SimpleSocketPluginTest extends BriarTestCase {
StreamCallback callback = new StreamCallback(); StreamCallback callback = new StreamCallback();
callback.local.put("internal", "127.0.0.1"); callback.local.put("internal", "127.0.0.1");
callback.local.put("port", "0"); callback.local.put("port", "0");
ScheduledExecutorService e = Executors.newScheduledThreadPool(1); Executor e = Executors.newCachedThreadPool();
SimpleSocketPlugin plugin = new SimpleSocketPlugin(e, callback, 0L); SimpleSocketPlugin plugin = new SimpleSocketPlugin(e, callback, 0L);
plugin.start(); plugin.start();
// The plugin should have bound a socket and stored the port number // The plugin should have bound a socket and stored the port number
@@ -64,7 +64,7 @@ public class SimpleSocketPluginTest extends BriarTestCase {
@Test @Test
public void testOutgoingConnection() throws Exception { public void testOutgoingConnection() throws Exception {
StreamCallback callback = new StreamCallback(); StreamCallback callback = new StreamCallback();
ScheduledExecutorService e = Executors.newScheduledThreadPool(1); Executor e = Executors.newCachedThreadPool();
SimpleSocketPlugin plugin = new SimpleSocketPlugin(e, callback, 0L); SimpleSocketPlugin plugin = new SimpleSocketPlugin(e, callback, 0L);
plugin.start(); plugin.start();
// Listen on a local port // Listen on a local port