Moved lifecycle management into briar-core and reconfigured executors.

CryptoExecutor and DatabaseExecutor now use bounded thread pools with
unbounded queues, since running too many tasks in parallel is likely to
harm performance; IncomingConnectionExecutor, PluginExecutor and
ReliabilityExecutor use unbounded thread pools with direct handoff,
since their tasks may run indefinitely. There are no longer any bounded
executors, and all executors discard tasks when shutting down, which
fixes issue #3612189.

Responsibility for starting and stopping services has been moved from
BriarService in briar-android to LifecycleManagerImpl in briar-core.
However, BriarService is still responsible for stopping the
Android-specific executors, which is ugly. It would be better if
executors registered themselves with LifecycleManager.
This commit is contained in:
akwizgran
2013-05-04 01:25:30 +01:00
parent e1842e11c5
commit 673d7fa0c3
44 changed files with 432 additions and 583 deletions

View File

@@ -1,27 +1,22 @@
package net.sf.briar.crypto;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import net.sf.briar.api.crypto.CryptoComponent;
import net.sf.briar.api.crypto.CryptoExecutor;
import net.sf.briar.util.BoundedExecutor;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
public class CryptoModule extends AbstractModule {
// FIXME: Determine suitable values for these constants empirically
/**
* The maximum number of tasks that can be queued for execution before
* submitting another task will block.
*/
private static final int MAX_QUEUED_EXECUTOR_TASKS = 10;
/** The minimum number of executor threads to keep in the pool. */
private static final int MIN_EXECUTOR_THREADS = 1;
/** The maximum number of executor threads. */
private static final int MAX_EXECUTOR_THREADS =
Runtime.getRuntime().availableProcessors();
@@ -30,9 +25,16 @@ public class CryptoModule extends AbstractModule {
protected void configure() {
bind(CryptoComponent.class).to(
CryptoComponentImpl.class).in(Singleton.class);
// The executor is bounded, so tasks must be independent and short-lived
bind(Executor.class).annotatedWith(CryptoExecutor.class).toInstance(
new BoundedExecutor(MAX_QUEUED_EXECUTOR_TASKS,
MIN_EXECUTOR_THREADS, MAX_EXECUTOR_THREADS));
// The queue is unbounded, so tasks can be dependent
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create a limited # of threads and keep them in the pool for 60 secs
ExecutorService e = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(CryptoExecutor.class).toInstance(e);
bind(ExecutorService.class).annotatedWith(
CryptoExecutor.class).toInstance(e);
}
}

View File

@@ -1,11 +1,13 @@
package net.sf.briar.db;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.sql.Connection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import net.sf.briar.api.clock.Clock;
@@ -21,24 +23,24 @@ import com.google.inject.Singleton;
public class DatabaseModule extends AbstractModule {
/**
* The maximum number of database threads. When a task is submitted to the
* database executor and no thread is available to run it, the task will be
* queued.
*/
private static final int MAX_DB_THREADS = 10;
/** How many milliseconds to keep idle threads alive. */
private static final int DB_KEEPALIVE = 60 * 1000;
/** The maximum number of executor threads. */
private static final int MAX_EXECUTOR_THREADS = 10;
@Override
protected void configure() {
bind(DatabaseCleaner.class).to(DatabaseCleanerImpl.class);
// Use an unbounded queue to prevent deadlock between submitted tasks
// The queue is unbounded, so tasks can be dependent
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
bind(Executor.class).annotatedWith(DatabaseExecutor.class).toInstance(
new ThreadPoolExecutor(MAX_DB_THREADS, MAX_DB_THREADS,
DB_KEEPALIVE, MILLISECONDS, queue));
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create a limited # of threads and keep them in the pool for 60 secs
ExecutorService e = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(
DatabaseExecutor.class).toInstance(e);
bind(ExecutorService.class).annotatedWith(
DatabaseExecutor.class).toInstance(e);
}
@Provides

View File

@@ -0,0 +1,116 @@
package net.sf.briar.lifecycle;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.logging.Logger;
import net.sf.briar.api.crypto.CryptoExecutor;
import net.sf.briar.api.crypto.KeyManager;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.lifecycle.LifecycleManager;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.PluginManager;
import net.sf.briar.api.reliability.ReliabilityExecutor;
import net.sf.briar.api.transport.IncomingConnectionExecutor;
import com.google.inject.Inject;
class LifecycleManagerImpl implements LifecycleManager {
private static final Logger LOG =
Logger.getLogger(LifecycleManagerImpl.class.getName());
private final DatabaseComponent db;
private final KeyManager keyManager;
private final PluginManager pluginManager;
private final ExecutorService cryptoExecutor;
private final ExecutorService dbExecutor;
private final ExecutorService connExecutor;
private final ExecutorService pluginExecutor;
private final ExecutorService reliabilityExecutor;
private final CountDownLatch dbLatch = new CountDownLatch(1);
private final CountDownLatch startupLatch = new CountDownLatch(1);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
@Inject
LifecycleManagerImpl(DatabaseComponent db, KeyManager keyManager,
PluginManager pluginManager,
@CryptoExecutor ExecutorService cryptoExecutor,
@DatabaseExecutor ExecutorService dbExecutor,
@IncomingConnectionExecutor ExecutorService connExecutor,
@PluginExecutor ExecutorService pluginExecutor,
@ReliabilityExecutor ExecutorService reliabilityExecutor) {
this.db = db;
this.keyManager = keyManager;
this.pluginManager = pluginManager;
this.cryptoExecutor = cryptoExecutor;
this.dbExecutor = dbExecutor;
this.connExecutor = connExecutor;
this.pluginExecutor = pluginExecutor;
this.reliabilityExecutor = reliabilityExecutor;
}
public void startServices() {
try {
if(LOG.isLoggable(INFO)) LOG.info("Starting");
boolean reopened = db.open();
if(LOG.isLoggable(INFO)) {
if(reopened) LOG.info("Database reopened");
else LOG.info("Database created");
}
dbLatch.countDown();
keyManager.start();
if(LOG.isLoggable(INFO)) LOG.info("Key manager started");
int pluginsStarted = pluginManager.start();
if(LOG.isLoggable(INFO))
LOG.info(pluginsStarted + " plugins started");
startupLatch.countDown();
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
public void stopServices() {
try {
if(LOG.isLoggable(INFO)) LOG.info("Shutting down");
int pluginsStopped = pluginManager.stop();
if(LOG.isLoggable(INFO))
LOG.info(pluginsStopped + " plugins stopped");
keyManager.stop();
if(LOG.isLoggable(INFO)) LOG.info("Key manager stopped");
db.close();
if(LOG.isLoggable(INFO)) LOG.info("Database closed");
cryptoExecutor.shutdownNow();
dbExecutor.shutdownNow();
connExecutor.shutdownNow();
pluginExecutor.shutdownNow();
reliabilityExecutor.shutdownNow();
if(LOG.isLoggable(INFO)) LOG.info("Executors shut down");
shutdownLatch.countDown();
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
public void waitForDatabase() throws InterruptedException {
dbLatch.await();
}
public void waitForStartup() throws InterruptedException {
startupLatch.await();
}
public void waitForShutdown() throws InterruptedException {
shutdownLatch.await();
}
}

View File

@@ -1,14 +1,18 @@
package net.sf.briar.lifecycle;
import net.sf.briar.api.lifecycle.LifecycleManager;
import net.sf.briar.api.lifecycle.ShutdownManager;
import net.sf.briar.util.OsUtils;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
public class LifecycleModule extends AbstractModule {
@Override
protected void configure() {
bind(LifecycleManager.class).to(LifecycleManagerImpl.class).in(
Singleton.class);
if(OsUtils.isWindows())
bind(ShutdownManager.class).to(WindowsShutdownManagerImpl.class);
else bind(ShutdownManager.class).to(ShutdownManagerImpl.class);

View File

@@ -2,7 +2,7 @@ package net.sf.briar.plugins;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import net.sf.briar.api.crypto.CryptoComponent;
import net.sf.briar.api.lifecycle.ShutdownManager;
@@ -28,7 +28,7 @@ public class JavaSePluginsModule extends AbstractModule {
@Provides
SimplexPluginConfig getSimplexPluginConfig(
@PluginExecutor ExecutorService pluginExecutor) {
@PluginExecutor Executor pluginExecutor) {
SimplexPluginFactory removable =
new RemovableDrivePluginFactory(pluginExecutor);
final Collection<SimplexPluginFactory> factories =
@@ -42,7 +42,7 @@ public class JavaSePluginsModule extends AbstractModule {
@Provides
DuplexPluginConfig getDuplexPluginConfig(
@PluginExecutor ExecutorService pluginExecutor,
@PluginExecutor Executor pluginExecutor,
CryptoComponent crypto, ReliabilityLayerFactory reliabilityFactory,
ShutdownManager shutdownManager) {
DuplexPluginFactory bluetooth = new BluetoothPluginFactory(

View File

@@ -11,7 +11,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
@@ -19,7 +19,6 @@ import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportConfig;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.android.AndroidExecutor;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.plugins.Plugin;
@@ -49,8 +48,7 @@ class PluginManagerImpl implements PluginManager {
private static final Logger LOG =
Logger.getLogger(PluginManagerImpl.class.getName());
private final ExecutorService pluginExecutor;
private final AndroidExecutor androidExecutor;
private final Executor pluginExecutor;
private final SimplexPluginConfig simplexPluginConfig;
private final DuplexPluginConfig duplexPluginConfig;
private final DatabaseComponent db;
@@ -61,14 +59,12 @@ class PluginManagerImpl implements PluginManager {
private final List<DuplexPlugin> duplexPlugins;
@Inject
PluginManagerImpl(@PluginExecutor ExecutorService pluginExecutor,
AndroidExecutor androidExecutor,
PluginManagerImpl(@PluginExecutor Executor pluginExecutor,
SimplexPluginConfig simplexPluginConfig,
DuplexPluginConfig duplexPluginConfig, DatabaseComponent db,
Poller poller, ConnectionDispatcher dispatcher,
UiCallback uiCallback) {
this.pluginExecutor = pluginExecutor;
this.androidExecutor = androidExecutor;
this.simplexPluginConfig = simplexPluginConfig;
this.duplexPluginConfig = duplexPluginConfig;
this.db = db;
@@ -144,10 +140,6 @@ class PluginManagerImpl implements PluginManager {
Thread.currentThread().interrupt();
return 0;
}
// Shut down the executors
if(LOG.isLoggable(INFO)) LOG.info("Stopping executors");
pluginExecutor.shutdown();
androidExecutor.shutdown();
// Return the number of plugins successfully stopped
return stopped.get();
}

View File

@@ -1,7 +1,13 @@
package net.sf.briar.plugins;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.PluginManager;
@@ -13,12 +19,19 @@ public class PluginsModule extends AbstractModule {
@Override
protected void configure() {
// The executor is unbounded, so tasks can be dependent or long-lived
ExecutorService e = Executors.newCachedThreadPool();
bind(ExecutorService.class).annotatedWith(
PluginExecutor.class).toInstance(e);
bind(PluginManager.class).to(
PluginManagerImpl.class).in(Singleton.class);
bind(Poller.class).to(PollerImpl.class);
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ExecutorService e = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(PluginExecutor.class).toInstance(e);
bind(ExecutorService.class).annotatedWith(
PluginExecutor.class).toInstance(e);
}
}

View File

@@ -5,7 +5,7 @@ import static java.util.logging.Level.INFO;
import java.util.Collection;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
@@ -21,13 +21,13 @@ class PollerImpl implements Poller, Runnable {
private static final Logger LOG =
Logger.getLogger(PollerImpl.class.getName());
private final ExecutorService pluginExecutor;
private final Executor pluginExecutor;
private final ConnectionRegistry connRegistry;
private final Clock clock;
private final SortedSet<PollTime> pollTimes;
@Inject
PollerImpl(@PluginExecutor ExecutorService pluginExecutor,
PollerImpl(@PluginExecutor Executor pluginExecutor,
ConnectionRegistry connRegistry, Clock clock) {
this.pluginExecutor = pluginExecutor;
this.connRegistry = connRegistry;
@@ -71,7 +71,7 @@ class PollerImpl implements Poller, Runnable {
connRegistry.getConnectedContacts(p.plugin.getId());
if(LOG.isLoggable(INFO))
LOG.info("Polling " + p.plugin.getClass().getName());
pluginExecutor.submit(new Runnable() {
pluginExecutor.execute(new Runnable() {
public void run() {
p.plugin.poll(connected);
}

View File

@@ -1,15 +0,0 @@
package net.sf.briar.reliability;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import com.google.inject.BindingAnnotation;
/** Annotation for injecting the executor used by reliability layers. */
@BindingAnnotation
@Target({ PARAMETER })
@Retention(RUNTIME)
@interface ReliabilityExecutor {}

View File

@@ -4,6 +4,7 @@ import java.util.concurrent.Executor;
import net.sf.briar.api.clock.Clock;
import net.sf.briar.api.clock.SystemClock;
import net.sf.briar.api.reliability.ReliabilityExecutor;
import net.sf.briar.api.reliability.ReliabilityLayer;
import net.sf.briar.api.reliability.ReliabilityLayerFactory;
import net.sf.briar.api.reliability.WriteHandler;

View File

@@ -1,8 +1,15 @@
package net.sf.briar.reliability;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import net.sf.briar.api.reliability.ReliabilityExecutor;
import net.sf.briar.api.reliability.ReliabilityLayerFactory;
import com.google.inject.AbstractModule;
@@ -11,11 +18,19 @@ public class ReliabilityModule extends AbstractModule {
@Override
protected void configure() {
// The executor is unbounded - tasks are expected to be long-lived
Executor e = Executors.newCachedThreadPool();
bind(Executor.class).annotatedWith(
ReliabilityExecutor.class).toInstance(e);
bind(ReliabilityLayerFactory.class).to(
ReliabilityLayerFactoryImpl.class);
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ExecutorService e = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(
ReliabilityExecutor.class).toInstance(e);
bind(ExecutorService.class).annotatedWith(
ReliabilityExecutor.class).toInstance(e);
}
}

View File

@@ -20,6 +20,7 @@ import net.sf.briar.api.plugins.simplex.SimplexTransportWriter;
import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionDispatcher;
import net.sf.briar.api.transport.ConnectionRecogniser;
import net.sf.briar.api.transport.IncomingConnectionExecutor;
import com.google.inject.Inject;

View File

@@ -1,17 +0,0 @@
package net.sf.briar.transport;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import com.google.inject.BindingAnnotation;
/**
* Annotation for injecting the executor for recognising incoming connections.
*/
@BindingAnnotation
@Target({ PARAMETER })
@Retention(RUNTIME)
@interface IncomingConnectionExecutor {}

View File

@@ -1,7 +1,13 @@
package net.sf.briar.transport;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import net.sf.briar.api.crypto.KeyManager;
import net.sf.briar.api.transport.ConnectionDispatcher;
@@ -9,6 +15,7 @@ import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionRecogniser;
import net.sf.briar.api.transport.ConnectionRegistry;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.api.transport.IncomingConnectionExecutor;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
@@ -25,10 +32,18 @@ public class TransportModule extends AbstractModule {
bind(ConnectionRegistry.class).toInstance(new ConnectionRegistryImpl());
bind(ConnectionWriterFactory.class).to(
ConnectionWriterFactoryImpl.class);
// The executor is unbounded, so tasks can be dependent or long-lived
Executor e = Executors.newCachedThreadPool();
bind(KeyManager.class).to(KeyManagerImpl.class).in(Singleton.class);
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ExecutorService e = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
bind(Executor.class).annotatedWith(
IncomingConnectionExecutor.class).toInstance(e);
bind(KeyManager.class).to(KeyManagerImpl.class).in(Singleton.class);
bind(ExecutorService.class).annotatedWith(
IncomingConnectionExecutor.class).toInstance(e);
}
}

View File

@@ -1,58 +0,0 @@
package net.sf.briar.util;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Logger;
/**
* An executor that limits the number of concurrently executing tasks and the
* number of tasks queued for execution.
*/
public class BoundedExecutor implements Executor {
private static final Logger LOG =
Logger.getLogger(BoundedExecutor.class.getName());
private final Semaphore semaphore;
private final BlockingQueue<Runnable> queue;
private final Executor executor;
public BoundedExecutor(int maxQueued, int minThreads, int maxThreads) {
semaphore = new Semaphore(maxQueued + maxThreads);
queue = new LinkedBlockingQueue<Runnable>();
executor = new ThreadPoolExecutor(minThreads, maxThreads, 60, SECONDS,
queue);
}
public void execute(final Runnable r) {
try {
semaphore.acquire();
executor.execute(new Runnable() {
public void run() {
try {
r.run();
} finally {
semaphore.release();
}
}
});
} catch(InterruptedException e) {
if(LOG.isLoggable(INFO))
LOG.info("Interrupted while queueing task");
Thread.currentThread().interrupt();
throw new RejectedExecutionException();
} catch(RejectedExecutionException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
semaphore.release();
throw e;
}
}
}