Cancel outstanding tasks and shut down the executor.

This commit is contained in:
akwizgran
2011-12-09 21:13:53 +00:00
parent cd068e89c0
commit 8af7e72943
6 changed files with 29 additions and 15 deletions

View File

@@ -6,10 +6,10 @@ public interface PluginManager {
* Starts the plugins and returns the number of plugins successfully * Starts the plugins and returns the number of plugins successfully
* started. * started.
*/ */
int startPlugins(); int start();
/** /**
* Stops the plugins and returns the number of plugins successfully stopped. * Stops the plugins and returns the number of plugins successfully stopped.
*/ */
int stopPlugins(); int stop();
} }

View File

@@ -76,7 +76,7 @@ class PluginManagerImpl implements PluginManager {
return batchPlugins.size() + streamPlugins.size(); return batchPlugins.size() + streamPlugins.size();
} }
public synchronized int startPlugins() { public synchronized int start() {
Set<TransportId> ids = new HashSet<TransportId>(); Set<TransportId> ids = new HashSet<TransportId>();
// Instantiate and start the batch plugins // Instantiate and start the batch plugins
for(String s : BATCH_FACTORIES) { for(String s : BATCH_FACTORIES) {
@@ -162,12 +162,12 @@ class PluginManagerImpl implements PluginManager {
List<Plugin> plugins = new ArrayList<Plugin>(); List<Plugin> plugins = new ArrayList<Plugin>();
plugins.addAll(batchPlugins); plugins.addAll(batchPlugins);
plugins.addAll(streamPlugins); plugins.addAll(streamPlugins);
poller.startPolling(Collections.unmodifiableList(plugins)); poller.start(Collections.unmodifiableList(plugins));
// Return the number of plugins successfully started // Return the number of plugins successfully started
return batchPlugins.size() + streamPlugins.size(); return batchPlugins.size() + streamPlugins.size();
} }
public synchronized int stopPlugins() { public synchronized int stop() {
int stopped = 0; int stopped = 0;
// Stop the batch plugins // Stop the batch plugins
for(BatchPlugin plugin : batchPlugins) { for(BatchPlugin plugin : batchPlugins) {
@@ -189,6 +189,10 @@ class PluginManagerImpl implements PluginManager {
} }
} }
streamPlugins.clear(); streamPlugins.clear();
// Stop the poller
poller.stop();
// Shut down the executor service
pluginExecutor.shutdown();
// Return the number of plugins successfully stopped // Return the number of plugins successfully stopped
return stopped; return stopped;
} }

View File

@@ -7,8 +7,8 @@ import net.sf.briar.api.plugins.Plugin;
interface Poller { interface Poller {
/** Starts a new thread to poll the given collection of plugins. */ /** Starts a new thread to poll the given collection of plugins. */
void startPolling(Collection<Plugin> plugins); void start(Collection<Plugin> plugins);
/** Tells the poller thread to exit. */ /** Tells the poller thread to exit. */
void stopPolling(); void stop();
} }

View File

@@ -26,7 +26,7 @@ class PollerImpl implements Poller, Runnable {
pollTimes = new TreeSet<PollTime>(); pollTimes = new TreeSet<PollTime>();
} }
public synchronized void startPolling(Collection<Plugin> plugins) { public synchronized void start(Collection<Plugin> plugins) {
for(Plugin plugin : plugins) schedule(plugin); for(Plugin plugin : plugins) schedule(plugin);
new Thread(this).start(); new Thread(this).start();
} }
@@ -39,7 +39,7 @@ class PollerImpl implements Poller, Runnable {
} }
} }
public synchronized void stopPolling() { public synchronized void stop() {
pollTimes.clear(); pollTimes.clear();
notifyAll(); notifyAll();
} }

View File

@@ -1,6 +1,7 @@
package net.sf.briar.plugins.bluetooth; package net.sf.briar.plugins.bluetooth;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@@ -8,6 +9,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -43,6 +45,7 @@ class BluetoothPlugin implements StreamPlugin {
private final StreamPluginCallback callback; private final StreamPluginCallback callback;
private final long pollingInterval; private final long pollingInterval;
private final Object discoveryLock = new Object(); private final Object discoveryLock = new Object();
private final Collection<ScheduledFuture<?>> socketClosers; // Locking: this
private boolean running = false; // Locking: this private boolean running = false; // Locking: this
private LocalDevice localDevice = null; // Locking: this private LocalDevice localDevice = null; // Locking: this
@@ -53,6 +56,7 @@ class BluetoothPlugin implements StreamPlugin {
this.pluginExecutor = pluginExecutor; this.pluginExecutor = pluginExecutor;
this.callback = callback; this.callback = callback;
this.pollingInterval = pollingInterval; this.pollingInterval = pollingInterval;
socketClosers = new ArrayList<ScheduledFuture<?>>();
} }
public TransportId getId() { public TransportId getId() {
@@ -167,6 +171,7 @@ class BluetoothPlugin implements StreamPlugin {
public synchronized void stop() throws IOException { public synchronized void stop() throws IOException {
running = false; running = false;
for(ScheduledFuture<?> close : socketClosers) close.cancel(false);
localDevice = null; localDevice = null;
if(socket != null) { if(socket != null) {
socket.close(); socket.close();
@@ -377,11 +382,15 @@ class BluetoothPlugin implements StreamPlugin {
return; return;
} }
// Close the socket when the invitation times out // Close the socket when the invitation times out
pluginExecutor.schedule(new Runnable() { Runnable close = new Runnable() {
public void run() { public void run() {
tryToClose(scn); tryToClose(scn);
} }
}, c.getTimeout(), TimeUnit.MILLISECONDS); };
synchronized(this) {
socketClosers.add(pluginExecutor.schedule(close, c.getTimeout(),
TimeUnit.MILLISECONDS));
}
try { try {
StreamConnection s = scn.acceptAndOpen(); StreamConnection s = scn.acceptAndOpen();
c.addConnection(s); c.addConnection(s);

View File

@@ -30,7 +30,7 @@ public class PluginManagerImplTest extends BriarTestCase {
final UiCallback uiCallback = context.mock(UiCallback.class); final UiCallback uiCallback = context.mock(UiCallback.class);
final AtomicInteger index = new AtomicInteger(0); final AtomicInteger index = new AtomicInteger(0);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(poller).startPolling(with(any(Collection.class))); oneOf(poller).start(with(any(Collection.class)));
allowing(db).getLocalIndex(with(any(TransportId.class))); allowing(db).getLocalIndex(with(any(TransportId.class)));
will(returnValue(null)); will(returnValue(null));
allowing(db).addTransport(with(any(TransportId.class))); allowing(db).addTransport(with(any(TransportId.class)));
@@ -41,17 +41,18 @@ public class PluginManagerImplTest extends BriarTestCase {
will(returnValue(new TransportProperties())); will(returnValue(new TransportProperties()));
allowing(db).setLocalProperties(with(any(TransportId.class)), allowing(db).setLocalProperties(with(any(TransportId.class)),
with(any(TransportProperties.class))); with(any(TransportProperties.class)));
oneOf(poller).stopPolling(); oneOf(poller).stop();
}}); }});
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
PluginManagerImpl p = new PluginManagerImpl(executor, db, poller, PluginManagerImpl p = new PluginManagerImpl(executor, db, poller,
dispatcher, uiCallback); dispatcher, uiCallback);
// We expect either 2 or 3 plugins to be started, depending on whether // We expect either 2 or 3 plugins to be started, depending on whether
// the test machine has a Bluetooth device // the test machine has a Bluetooth device
int started = p.startPlugins(); int started = p.start();
int stopped = p.stopPlugins(); int stopped = p.stop();
assertEquals(started, stopped); assertEquals(started, stopped);
assertTrue(started >= 2); assertTrue(started >= 2);
assertTrue(started <= 3); assertTrue(started <= 3);
context.assertIsSatisfied();
} }
} }