Plugins should use the executor rather than creating threads.

This commit is contained in:
akwizgran
2011-12-08 16:57:24 +00:00
parent c6b6c20205
commit 9f0b865ba8
7 changed files with 59 additions and 90 deletions

View File

@@ -126,7 +126,11 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
} }
socket = scn; socket = scn;
} }
startContactAccepterThread(); pluginExecutor.execute(new Runnable() {
public void run() {
acceptContactConnections();
}
});
} }
private synchronized String getUuid() { private synchronized String getUuid() {
@@ -160,15 +164,6 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
} }
} }
private void startContactAccepterThread() {
new Thread() {
@Override
public void run() {
acceptContactConnections();
}
}.start();
}
private void acceptContactConnections() { private void acceptContactConnections() {
while(true) { while(true) {
StreamConnectionNotifier scn; StreamConnectionNotifier scn;
@@ -299,9 +294,17 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
// The invitee's device may not be discoverable, so both parties must // The invitee's device may not be discoverable, so both parties must
// try to initiate connections // try to initiate connections
String uuid = convertInvitationCodeToUuid(code); String uuid = convertInvitationCodeToUuid(code);
ConnectionCallback c = new ConnectionCallback(uuid, timeout); final ConnectionCallback c = new ConnectionCallback(uuid, timeout);
startOutgoingInvitationThread(c); pluginExecutor.execute(new Runnable() {
startIncomingInvitationThread(c); public void run() {
createInvitationConnection(c);
}
});
pluginExecutor.execute(new Runnable() {
public void run() {
bindInvitationSocket(c);
}
});
try { try {
StreamConnection s = c.waitForConnection(); StreamConnection s = c.waitForConnection();
return s == null ? null : new BluetoothTransportConnection(s); return s == null ? null : new BluetoothTransportConnection(s);
@@ -319,15 +322,6 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
return StringUtils.toHexString(b); return StringUtils.toHexString(b);
} }
private void startOutgoingInvitationThread(final ConnectionCallback c) {
new Thread() {
@Override
public void run() {
createInvitationConnection(c);
}
}.start();
}
private void createInvitationConnection(ConnectionCallback c) { private void createInvitationConnection(ConnectionCallback c) {
DiscoveryAgent discoveryAgent; DiscoveryAgent discoveryAgent;
synchronized(this) { synchronized(this) {
@@ -369,30 +363,25 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
} }
} }
private void startIncomingInvitationThread(final ConnectionCallback c) { private void bindInvitationSocket(final ConnectionCallback c) {
new Thread() {
@Override
public void run() {
bindInvitationSocket(c);
}
}.start();
}
private void bindInvitationSocket(ConnectionCallback c) {
synchronized(this) { synchronized(this) {
if(!started) return; if(!started) return;
makeDeviceDiscoverable(); makeDeviceDiscoverable();
} }
// Bind the socket // Bind the socket
String url = "btspp://localhost:" + c.getUuid() + ";name=RFCOMM"; String url = "btspp://localhost:" + c.getUuid() + ";name=RFCOMM";
StreamConnectionNotifier scn; final StreamConnectionNotifier scn;
try { try {
scn = (StreamConnectionNotifier) Connector.open(url); scn = (StreamConnectionNotifier) Connector.open(url);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
return; return;
} }
startInvitationAccepterThread(c, scn); pluginExecutor.execute(new Runnable() {
public void run() {
acceptInvitationConnection(c, scn);
}
});
// Close the socket when the invitation times out // Close the socket when the invitation times out
try { try {
Thread.sleep(c.getTimeout()); Thread.sleep(c.getTimeout());
@@ -408,16 +397,6 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
} }
} }
private void startInvitationAccepterThread(final ConnectionCallback c,
final StreamConnectionNotifier scn) {
new Thread() {
@Override
public void run() {
acceptInvitationConnection(c, scn);
}
}.start();
}
private void acceptInvitationConnection(ConnectionCallback c, private void acceptInvitationConnection(ConnectionCallback c,
StreamConnectionNotifier scn) { StreamConnectionNotifier scn) {
synchronized(this) { synchronized(this) {

View File

@@ -3,14 +3,18 @@ package net.sf.briar.plugins.file;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import net.sf.briar.api.plugins.PluginExecutor;
class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable { class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(PollingRemovableDriveMonitor.class.getName()); Logger.getLogger(PollingRemovableDriveMonitor.class.getName());
private final Executor pluginExecutor;
private final RemovableDriveFinder finder; private final RemovableDriveFinder finder;
private final long pollingInterval; private final long pollingInterval;
private final Object pollingLock = new Object(); private final Object pollingLock = new Object();
@@ -19,8 +23,9 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
private volatile Callback callback = null; private volatile Callback callback = null;
private volatile IOException exception = null; private volatile IOException exception = null;
public PollingRemovableDriveMonitor(RemovableDriveFinder finder, public PollingRemovableDriveMonitor(@PluginExecutor Executor pluginExecutor,
long pollingInterval) { RemovableDriveFinder finder, long pollingInterval) {
this.pluginExecutor = pluginExecutor;
this.finder = finder; this.finder = finder;
this.pollingInterval = pollingInterval; this.pollingInterval = pollingInterval;
} }
@@ -29,7 +34,7 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
if(running) throw new IllegalStateException(); if(running) throw new IllegalStateException();
running = true; running = true;
this.callback = callback; this.callback = callback;
new Thread(this).start(); pluginExecutor.execute(this);
} }
public synchronized void stop() throws IOException { public synchronized void stop() throws IOException {
@@ -50,14 +55,7 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
Collection<File> drives = finder.findRemovableDrives(); Collection<File> drives = finder.findRemovableDrives();
while(running) { while(running) {
synchronized(pollingLock) { synchronized(pollingLock) {
try { pollingLock.wait(pollingInterval);
pollingLock.wait(pollingInterval);
} catch(InterruptedException e) {
if(LOG.isLoggable(Level.INFO))
LOG.info("Interrupted while waiting to poll");
Thread.currentThread().interrupt();
return;
}
} }
if(!running) return; if(!running) return;
Collection<File> newDrives = finder.findRemovableDrives(); Collection<File> newDrives = finder.findRemovableDrives();
@@ -66,6 +64,10 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
} }
drives = newDrives; drives = newDrives;
} }
} catch(InterruptedException e) {
if(LOG.isLoggable(Level.INFO))
LOG.info("Interrupted while waiting to poll");
Thread.currentThread().interrupt();
} catch(IOException e) { } catch(IOException e) {
exception = e; exception = e;
} }

View File

@@ -25,11 +25,11 @@ public class RemovableDrivePluginFactory implements BatchPluginFactory {
} else if(OsUtils.isMac()) { } else if(OsUtils.isMac()) {
// JNotify requires OS X 10.5 or newer, so we have to poll // JNotify requires OS X 10.5 or newer, so we have to poll
finder = new MacRemovableDriveFinder(); finder = new MacRemovableDriveFinder();
monitor = new PollingRemovableDriveMonitor(finder, monitor = new PollingRemovableDriveMonitor(pluginExecutor, finder,
POLLING_INTERVAL); POLLING_INTERVAL);
} else if(OsUtils.isWindows()) { } else if(OsUtils.isWindows()) {
finder = new WindowsRemovableDriveFinder(); finder = new WindowsRemovableDriveFinder();
monitor = new PollingRemovableDriveMonitor(finder, monitor = new PollingRemovableDriveMonitor(pluginExecutor, finder,
POLLING_INTERVAL); POLLING_INTERVAL);
} else { } else {
return null; return null;

View File

@@ -42,15 +42,11 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin {
@Override @Override
public synchronized void start() throws IOException { public synchronized void start() throws IOException {
super.start(); super.start();
pluginExecutor.execute(createBinder()); pluginExecutor.execute(new Runnable() {
}
private Runnable createBinder() {
return new Runnable() {
public void run() { public void run() {
bind(); bind();
} }
}; });
} }
private void bind() { private void bind() {
@@ -93,25 +89,11 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin {
socket = ss; socket = ss;
setLocalSocketAddress(ss.getLocalSocketAddress()); setLocalSocketAddress(ss.getLocalSocketAddress());
} }
startListenerThread(); // Accept connections until the socket is closed
}
private void startListenerThread() {
new Thread() {
@Override
public void run() {
listen();
}
}.start();
}
private void listen() {
while(true) { while(true) {
ServerSocket ss;
Socket s; Socket s;
synchronized(this) { synchronized(this) {
if(!started) return; if(!started) return;
ss = socket;
} }
try { try {
s = ss.accept(); s = ss.accept();

View File

@@ -1,6 +1,7 @@
package net.sf.briar.plugins; package net.sf.briar.plugins;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase; import junit.framework.TestCase;
@@ -37,7 +38,7 @@ public class PluginManagerImplTest extends TestCase {
allowing(db).setLocalProperties(with(any(TransportId.class)), allowing(db).setLocalProperties(with(any(TransportId.class)),
with(any(TransportProperties.class))); with(any(TransportProperties.class)));
}}); }});
Executor executor = new ImmediateExecutor(); Executor executor = Executors.newCachedThreadPool();
Poller poller = new PollerImpl(); Poller poller = new PollerImpl();
PluginManagerImpl p = new PluginManagerImpl(db, executor, poller, PluginManagerImpl p = new PluginManagerImpl(db, executor, poller,
dispatcher, uiCallback); dispatcher, uiCallback);

View File

@@ -6,6 +6,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import junit.framework.TestCase; import junit.framework.TestCase;
@@ -46,8 +47,8 @@ public class PollingRemovableDriveMonitorTest extends TestCase {
} }
}; };
// Create the monitor and start it // Create the monitor and start it
final RemovableDriveMonitor monitor = final RemovableDriveMonitor monitor = new PollingRemovableDriveMonitor(
new PollingRemovableDriveMonitor(finder, 10); Executors.newCachedThreadPool(), finder, 10);
monitor.start(callback); monitor.start(callback);
// Wait for the monitor to detect the files // Wait for the monitor to detect the files
assertTrue(latch.await(1, TimeUnit.SECONDS)); assertTrue(latch.await(1, TimeUnit.SECONDS));
@@ -74,8 +75,8 @@ public class PollingRemovableDriveMonitorTest extends TestCase {
will(throwException(new IOException())); will(throwException(new IOException()));
}}); }});
// Create the monitor, start it, and give it some time to run // Create the monitor, start it, and give it some time to run
final RemovableDriveMonitor monitor = final RemovableDriveMonitor monitor = new PollingRemovableDriveMonitor(
new PollingRemovableDriveMonitor(finder, 10); Executors.newCachedThreadPool(), finder, 10);
monitor.start(null); monitor.start(null);
Thread.sleep(50); Thread.sleep(50);
// The monitor should rethrow the exception when it stops // The monitor should rethrow the exception when it stops

View File

@@ -7,6 +7,7 @@ import java.net.Socket;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@@ -16,7 +17,6 @@ import net.sf.briar.api.TransportConfig;
import net.sf.briar.api.TransportProperties; import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.plugins.StreamPluginCallback; import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.transport.StreamTransportConnection; import net.sf.briar.api.transport.StreamTransportConnection;
import net.sf.briar.plugins.ImmediateExecutor;
import org.junit.Test; import org.junit.Test;
@@ -29,10 +29,11 @@ public class SimpleSocketPluginTest extends TestCase {
StreamCallback callback = new StreamCallback(); StreamCallback callback = new StreamCallback();
callback.local.put("internal", "127.0.0.1"); callback.local.put("internal", "127.0.0.1");
callback.local.put("port", "0"); callback.local.put("port", "0");
SimpleSocketPlugin plugin = SimpleSocketPlugin plugin = new SimpleSocketPlugin(
new SimpleSocketPlugin(new ImmediateExecutor(), callback, 0L); Executors.newCachedThreadPool(), callback, 0L);
plugin.start(); plugin.start();
// The plugin should have bound a socket and stored the port number // The plugin should have bound a socket and stored the port number
callback.latch.await(1, TimeUnit.SECONDS);
String host = callback.local.get("internal"); String host = callback.local.get("internal");
assertNotNull(host); assertNotNull(host);
assertEquals("127.0.0.1", host); assertEquals("127.0.0.1", host);
@@ -62,8 +63,8 @@ public class SimpleSocketPluginTest extends TestCase {
@Test @Test
public void testOutgoingConnection() throws Exception { public void testOutgoingConnection() throws Exception {
StreamCallback callback = new StreamCallback(); StreamCallback callback = new StreamCallback();
SimpleSocketPlugin plugin = SimpleSocketPlugin plugin = new SimpleSocketPlugin(
new SimpleSocketPlugin(new ImmediateExecutor(), callback, 0L); Executors.newCachedThreadPool(), callback, 0L);
plugin.start(); plugin.start();
// Listen on a local port // Listen on a local port
final ServerSocket ss = new ServerSocket(); final ServerSocket ss = new ServerSocket();
@@ -101,10 +102,12 @@ public class SimpleSocketPluginTest extends TestCase {
private static class StreamCallback implements StreamPluginCallback { private static class StreamCallback implements StreamPluginCallback {
private TransportConfig config = new TransportConfig();
private TransportProperties local = new TransportProperties();
private final Map<ContactId, TransportProperties> remote = private final Map<ContactId, TransportProperties> remote =
new HashMap<ContactId, TransportProperties>(); new HashMap<ContactId, TransportProperties>();
private final CountDownLatch latch = new CountDownLatch(1);
private TransportConfig config = new TransportConfig();
private TransportProperties local = new TransportProperties();
private int incomingConnections = 0; private int incomingConnections = 0;
@@ -125,6 +128,7 @@ public class SimpleSocketPluginTest extends TestCase {
} }
public void setLocalProperties(TransportProperties p) { public void setLocalProperties(TransportProperties p) {
latch.countDown();
local = p; local = p;
} }