Executors and Services register themselves with the LifecycleManager.

Fixes issue #3612607.
This commit is contained in:
akwizgran
2013-05-15 12:26:56 +01:00
parent dddd15cd10
commit 630cfde81e
30 changed files with 301 additions and 189 deletions

View File

@@ -11,8 +11,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import net.sf.briar.api.crypto.CryptoComponent;
import net.sf.briar.api.crypto.CryptoExecutor;
import net.sf.briar.api.lifecycle.LifecycleManager;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
public class CryptoModule extends AbstractModule {
@@ -21,20 +23,28 @@ public class CryptoModule extends AbstractModule {
private static final int MAX_EXECUTOR_THREADS =
Runtime.getRuntime().availableProcessors();
@Override
protected void configure() {
bind(CryptoComponent.class).to(
CryptoComponentImpl.class).in(Singleton.class);
private final ExecutorService cryptoExecutor;
public CryptoModule() {
// The queue is unbounded, so tasks can be dependent
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create a limited # of threads and keep them in the pool for 60 secs
ExecutorService e = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS,
cryptoExecutor = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(CryptoExecutor.class).toInstance(e);
bind(ExecutorService.class).annotatedWith(
CryptoExecutor.class).toInstance(e);
}
@Override
protected void configure() {
bind(CryptoComponent.class).to(
CryptoComponentImpl.class).in(Singleton.class);
}
@Provides @Singleton @CryptoExecutor
Executor getCryptoExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(cryptoExecutor);
return cryptoExecutor;
}
}

View File

@@ -15,6 +15,7 @@ import net.sf.briar.api.clock.SystemClock;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseConfig;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.lifecycle.LifecycleManager;
import net.sf.briar.api.lifecycle.ShutdownManager;
import com.google.inject.AbstractModule;
@@ -26,21 +27,22 @@ public class DatabaseModule extends AbstractModule {
/** The maximum number of executor threads. */
private static final int MAX_EXECUTOR_THREADS = 10;
@Override
protected void configure() {
bind(DatabaseCleaner.class).to(DatabaseCleanerImpl.class);
private final ExecutorService databaseExecutor;
public DatabaseModule() {
// The queue is unbounded, so tasks can be dependent
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create a limited # of threads and keep them in the pool for 60 secs
ExecutorService e = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS,
databaseExecutor = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(
DatabaseExecutor.class).toInstance(e);
bind(ExecutorService.class).annotatedWith(
DatabaseExecutor.class).toInstance(e);
}
@Override
protected void configure() {
bind(DatabaseCleaner.class).to(DatabaseCleanerImpl.class);
}
@Provides
@@ -54,4 +56,10 @@ public class DatabaseModule extends AbstractModule {
return new DatabaseComponentImpl<Connection>(db, cleaner, shutdown,
clock);
}
@Provides @Singleton @DatabaseExecutor
Executor getDatabaseExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(databaseExecutor);
return databaseExecutor;
}
}

View File

@@ -4,20 +4,16 @@ import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.logging.Logger;
import net.sf.briar.api.crypto.CryptoExecutor;
import net.sf.briar.api.crypto.KeyManager;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.lifecycle.LifecycleManager;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.PluginManager;
import net.sf.briar.api.reliability.ReliabilityExecutor;
import net.sf.briar.api.transport.IncomingConnectionExecutor;
import net.sf.briar.api.lifecycle.Service;
import com.google.inject.Inject;
@@ -27,33 +23,29 @@ class LifecycleManagerImpl implements LifecycleManager {
Logger.getLogger(LifecycleManagerImpl.class.getName());
private final DatabaseComponent db;
private final KeyManager keyManager;
private final PluginManager pluginManager;
private final ExecutorService cryptoExecutor;
private final ExecutorService dbExecutor;
private final ExecutorService connExecutor;
private final ExecutorService pluginExecutor;
private final ExecutorService reliabilityExecutor;
private final Collection<Service> services;
private final Collection<ExecutorService> executors;
private final CountDownLatch dbLatch = new CountDownLatch(1);
private final CountDownLatch startupLatch = new CountDownLatch(1);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
@Inject
LifecycleManagerImpl(DatabaseComponent db, KeyManager keyManager,
PluginManager pluginManager,
@CryptoExecutor ExecutorService cryptoExecutor,
@DatabaseExecutor ExecutorService dbExecutor,
@IncomingConnectionExecutor ExecutorService connExecutor,
@PluginExecutor ExecutorService pluginExecutor,
@ReliabilityExecutor ExecutorService reliabilityExecutor) {
LifecycleManagerImpl(DatabaseComponent db) {
this.db = db;
this.keyManager = keyManager;
this.pluginManager = pluginManager;
this.cryptoExecutor = cryptoExecutor;
this.dbExecutor = dbExecutor;
this.connExecutor = connExecutor;
this.pluginExecutor = pluginExecutor;
this.reliabilityExecutor = reliabilityExecutor;
services = new CopyOnWriteArrayList<Service>();
executors = new CopyOnWriteArrayList<ExecutorService>();
}
public void register(Service s) {
if(LOG.isLoggable(INFO))
LOG.info("Registering service " + s.getClass().getName());
services.add(s);
}
public void registerForShutdown(ExecutorService e) {
if(LOG.isLoggable(INFO))
LOG.info("Registering executor " + e.getClass().getName());
executors.add(e);
}
public void startServices() {
@@ -65,11 +57,14 @@ class LifecycleManagerImpl implements LifecycleManager {
else LOG.info("Database created");
}
dbLatch.countDown();
keyManager.start();
if(LOG.isLoggable(INFO)) LOG.info("Key manager started");
int pluginsStarted = pluginManager.start();
if(LOG.isLoggable(INFO))
LOG.info(pluginsStarted + " plugins started");
for(Service s : services) {
boolean started = s.start();
if(LOG.isLoggable(INFO)) {
String name = s.getClass().getName();
if(started) LOG.info("Service started: " + name);
else LOG.info("Service failed to start: " + name);
}
}
startupLatch.countDown();
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -81,19 +76,19 @@ class LifecycleManagerImpl implements LifecycleManager {
public void stopServices() {
try {
if(LOG.isLoggable(INFO)) LOG.info("Shutting down");
int pluginsStopped = pluginManager.stop();
for(Service s : services) {
boolean stopped = s.stop();
if(LOG.isLoggable(INFO)) {
String name = s.getClass().getName();
if(stopped) LOG.info("Service stopped: " + name);
else LOG.warning("Service failed to stop: " + name);
}
}
for(ExecutorService e : executors) e.shutdownNow();
if(LOG.isLoggable(INFO))
LOG.info(pluginsStopped + " plugins stopped");
keyManager.stop();
if(LOG.isLoggable(INFO)) LOG.info("Key manager stopped");
LOG.info(executors.size() + " executors shut down");
db.close();
if(LOG.isLoggable(INFO)) LOG.info("Database closed");
cryptoExecutor.shutdownNow();
dbExecutor.shutdownNow();
connExecutor.shutdownNow();
pluginExecutor.shutdownNow();
reliabilityExecutor.shutdownNow();
if(LOG.isLoggable(INFO)) LOG.info("Executors shut down");
shutdownLatch.countDown();
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);

View File

@@ -12,7 +12,6 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
@@ -75,23 +74,21 @@ class PluginManagerImpl implements PluginManager {
duplexPlugins = new CopyOnWriteArrayList<DuplexPlugin>();
}
public synchronized int start() {
public synchronized boolean start() {
// Instantiate and start the simplex plugins
if(LOG.isLoggable(INFO)) LOG.info("Starting simplex plugins");
Collection<SimplexPluginFactory> sFactories =
simplexPluginConfig.getFactories();
final CountDownLatch sLatch = new CountDownLatch(sFactories.size());
for(SimplexPluginFactory factory : sFactories) {
for(SimplexPluginFactory factory : sFactories)
pluginExecutor.execute(new SimplexPluginStarter(factory, sLatch));
}
// Instantiate and start the duplex plugins
if(LOG.isLoggable(INFO)) LOG.info("Starting duplex plugins");
Collection<DuplexPluginFactory> dFactories =
duplexPluginConfig.getFactories();
final CountDownLatch dLatch = new CountDownLatch(dFactories.size());
for(DuplexPluginFactory factory : dFactories) {
for(DuplexPluginFactory factory : dFactories)
pluginExecutor.execute(new DuplexPluginStarter(factory, dLatch));
}
// Wait for the plugins to start
try {
sLatch.await();
@@ -100,7 +97,7 @@ class PluginManagerImpl implements PluginManager {
if(LOG.isLoggable(WARNING))
LOG.warning("Interrupted while starting plugins");
Thread.currentThread().interrupt();
return 0;
return false;
}
// Start the poller
if(LOG.isLoggable(INFO)) LOG.info("Starting poller");
@@ -108,27 +105,23 @@ class PluginManagerImpl implements PluginManager {
plugins.addAll(simplexPlugins);
plugins.addAll(duplexPlugins);
poller.start(Collections.unmodifiableList(plugins));
// Return the number of plugins successfully started
return plugins.size();
return true;
}
public synchronized int stop() {
public synchronized boolean stop() {
// Stop the poller
if(LOG.isLoggable(INFO)) LOG.info("Stopping poller");
poller.stop();
final AtomicInteger stopped = new AtomicInteger(0);
int plugins = simplexPlugins.size() + duplexPlugins.size();
final CountDownLatch latch = new CountDownLatch(plugins);
// Stop the simplex plugins
if(LOG.isLoggable(INFO)) LOG.info("Stopping simplex plugins");
for(SimplexPlugin plugin : simplexPlugins) {
pluginExecutor.execute(new PluginStopper(plugin, latch, stopped));
}
for(SimplexPlugin plugin : simplexPlugins)
pluginExecutor.execute(new PluginStopper(plugin, latch));
// Stop the duplex plugins
if(LOG.isLoggable(INFO)) LOG.info("Stopping duplex plugins");
for(DuplexPlugin plugin : duplexPlugins) {
pluginExecutor.execute(new PluginStopper(plugin, latch, stopped));
}
for(DuplexPlugin plugin : duplexPlugins)
pluginExecutor.execute(new PluginStopper(plugin, latch));
simplexPlugins.clear();
duplexPlugins.clear();
// Wait for all the plugins to stop
@@ -138,10 +131,9 @@ class PluginManagerImpl implements PluginManager {
if(LOG.isLoggable(WARNING))
LOG.warning("Interrupted while stopping plugins");
Thread.currentThread().interrupt();
return 0;
return false;
}
// Return the number of plugins successfully stopped
return stopped.get();
return true;
}
public Collection<DuplexPlugin> getInvitationPlugins() {
@@ -253,19 +245,15 @@ class PluginManagerImpl implements PluginManager {
private final Plugin plugin;
private final CountDownLatch latch;
private final AtomicInteger stopped;
private PluginStopper(Plugin plugin, CountDownLatch latch,
AtomicInteger stopped) {
private PluginStopper(Plugin plugin, CountDownLatch latch) {
this.plugin = plugin;
this.latch = latch;
this.stopped = stopped;
}
public void run() {
try {
plugin.stop();
stopped.incrementAndGet();
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {

View File

@@ -9,29 +9,44 @@ import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import net.sf.briar.api.lifecycle.LifecycleManager;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.PluginManager;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
public class PluginsModule extends AbstractModule {
@Override
protected void configure() {
bind(PluginManager.class).to(
PluginManagerImpl.class).in(Singleton.class);
bind(Poller.class).to(PollerImpl.class);
private final ExecutorService pluginExecutor;
public PluginsModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ExecutorService e = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
pluginExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(PluginExecutor.class).toInstance(e);
bind(ExecutorService.class).annotatedWith(
PluginExecutor.class).toInstance(e);
}
@Override
protected void configure() {
bind(Poller.class).to(PollerImpl.class);
}
@Provides @Singleton
PluginManager getPluginManager(LifecycleManager lifecycleManager,
PluginManagerImpl pluginManager) {
lifecycleManager.register(pluginManager);
return pluginManager;
}
@Provides @Singleton @PluginExecutor
Executor getPluginExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(pluginExecutor);
return pluginExecutor;
}
}

View File

@@ -9,28 +9,38 @@ import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import net.sf.briar.api.lifecycle.LifecycleManager;
import net.sf.briar.api.reliability.ReliabilityExecutor;
import net.sf.briar.api.reliability.ReliabilityLayerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
public class ReliabilityModule extends AbstractModule {
@Override
protected void configure() {
bind(ReliabilityLayerFactory.class).to(
ReliabilityLayerFactoryImpl.class);
private final ExecutorService reliabilityExecutor;
public ReliabilityModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ExecutorService e = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
reliabilityExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(
ReliabilityExecutor.class).toInstance(e);
bind(ExecutorService.class).annotatedWith(
ReliabilityExecutor.class).toInstance(e);
}
@Override
protected void configure() {
bind(ReliabilityLayerFactory.class).to(
ReliabilityLayerFactoryImpl.class);
}
@Provides @Singleton @ReliabilityExecutor
Executor getReliabilityExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(reliabilityExecutor);
return reliabilityExecutor;
}
}

View File

@@ -213,7 +213,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, DatabaseListener {
return created;
}
public synchronized void stop() {
public synchronized boolean stop() {
db.removeListener(this);
timer.cancel();
connectionRecogniser.removeSecrets();
@@ -221,6 +221,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, DatabaseListener {
removeAndEraseSecrets(oldSecrets);
removeAndEraseSecrets(currentSecrets);
removeAndEraseSecrets(newSecrets);
return true;
}
// Locking: this

View File

@@ -10,6 +10,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import net.sf.briar.api.crypto.KeyManager;
import net.sf.briar.api.lifecycle.LifecycleManager;
import net.sf.briar.api.transport.ConnectionDispatcher;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionRecogniser;
@@ -18,32 +19,46 @@ import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.api.transport.IncomingConnectionExecutor;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
public class TransportModule extends AbstractModule {
@Override
protected void configure() {
bind(ConnectionDispatcher.class).to(ConnectionDispatcherImpl.class);
bind(ConnectionReaderFactory.class).to(
ConnectionReaderFactoryImpl.class);
bind(ConnectionRecogniser.class).to(ConnectionRecogniserImpl.class).in(
Singleton.class);
bind(ConnectionRegistry.class).toInstance(new ConnectionRegistryImpl());
bind(ConnectionWriterFactory.class).to(
ConnectionWriterFactoryImpl.class);
bind(KeyManager.class).to(KeyManagerImpl.class).in(Singleton.class);
private final ExecutorService incomingConnectionExecutor;
public TransportModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ExecutorService e = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(
IncomingConnectionExecutor.class).toInstance(e);
bind(ExecutorService.class).annotatedWith(
IncomingConnectionExecutor.class).toInstance(e);
incomingConnectionExecutor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 60, SECONDS, queue, policy);
}
@Override
protected void configure() {
bind(ConnectionDispatcher.class).to(ConnectionDispatcherImpl.class);
bind(ConnectionReaderFactory.class).to(
ConnectionReaderFactoryImpl.class);
bind(ConnectionRecogniser.class).to(
ConnectionRecogniserImpl.class).in(Singleton.class);
bind(ConnectionRegistry.class).toInstance(new ConnectionRegistryImpl());
bind(ConnectionWriterFactory.class).to(
ConnectionWriterFactoryImpl.class);
}
@Provides @Singleton
KeyManager getKeyManager(LifecycleManager lifecycleManager,
KeyManagerImpl keyManager) {
lifecycleManager.register(keyManager);
return keyManager;
}
@Provides @Singleton @IncomingConnectionExecutor
Executor getIncomingConnectionExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(incomingConnectionExecutor);
return incomingConnectionExecutor;
}
}