Added BoundedExecutor and documented executor policies.

This commit is contained in:
akwizgran
2011-12-10 00:59:29 +00:00
parent e47d4990c3
commit f2de23854e
10 changed files with 111 additions and 128 deletions

View File

@@ -14,4 +14,4 @@ import com.google.inject.BindingAnnotation;
@BindingAnnotation @BindingAnnotation
@Target({ PARAMETER }) @Target({ PARAMETER })
@Retention(RUNTIME) @Retention(RUNTIME)
public @interface ConnectionRecogniserExecutor {} public @interface IncomingConnectionExecutor {}

View File

@@ -1,56 +0,0 @@
package net.sf.briar.db;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* An executor that limits the number of concurrent database tasks and the
* number of tasks queued for execution.
*/
class DatabaseExecutorImpl implements Executor {
// FIXME: Determine suitable values for these constants empirically
/**
* The maximum number of tasks that can be queued for execution
* before attempting to execute another task will block.
*/
private static final int MAX_QUEUED_TASKS = 10;
/** The number of idle threads to keep in the pool. */
private static final int MIN_THREADS = 1;
/** The maximum number of concurrent tasks. */
private static final int MAX_THREADS = 10;
private static final Logger LOG =
Logger.getLogger(DatabaseExecutorImpl.class.getName());
private final BlockingQueue<Runnable> queue;
DatabaseExecutorImpl() {
this(MAX_QUEUED_TASKS, MIN_THREADS, MAX_THREADS);
}
DatabaseExecutorImpl(int maxQueued, int minThreads, int maxThreads) {
queue = new ArrayBlockingQueue<Runnable>(maxQueued);
new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS,
queue);
}
public void execute(Runnable r) {
try {
// Block until there's space in the queue
queue.put(r);
} catch(InterruptedException e) {
if(LOG.isLoggable(Level.INFO))
LOG.info("Interrupted while queueing task");
Thread.currentThread().interrupt();
}
}
}

View File

@@ -15,6 +15,7 @@ import net.sf.briar.api.protocol.GroupFactory;
import net.sf.briar.api.protocol.PacketFactory; import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.transport.ConnectionContextFactory; import net.sf.briar.api.transport.ConnectionContextFactory;
import net.sf.briar.api.transport.ConnectionWindowFactory; import net.sf.briar.api.transport.ConnectionWindowFactory;
import net.sf.briar.util.BoundedExecutor;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Provides; import com.google.inject.Provides;
@@ -22,11 +23,27 @@ import com.google.inject.Singleton;
public class DatabaseModule extends AbstractModule { public class DatabaseModule extends AbstractModule {
// FIXME: Determine suitable values for these constants empirically
/**
* The maximum number of database tasks that can be queued for execution
* before submitting another task will block.
*/
private static final int MAX_QUEUED_DB_TASKS = 10;
/** The minimum number of database threads to keep in the pool. */
private static final int MIN_DB_THREADS = 1;
/** The maximum number of database threads. */
private static final int MAX_DB_THREADS = 10;
@Override @Override
protected void configure() { protected void configure() {
bind(DatabaseCleaner.class).to(DatabaseCleanerImpl.class); bind(DatabaseCleaner.class).to(DatabaseCleanerImpl.class);
// The executor is bounded, so tasks must be independent and short-lived
bind(Executor.class).annotatedWith(DatabaseExecutor.class).toInstance( bind(Executor.class).annotatedWith(DatabaseExecutor.class).toInstance(
new DatabaseExecutorImpl()); new BoundedExecutor(MAX_QUEUED_DB_TASKS, MIN_DB_THREADS,
MAX_DB_THREADS));
} }
@Provides @Provides

View File

@@ -13,6 +13,7 @@ public class PluginsModule extends AbstractModule {
@Override @Override
protected void configure() { protected void configure() {
// The executor is unbounded, so tasks can be dependent or long-lived
bind(ExecutorService.class).annotatedWith( bind(ExecutorService.class).annotatedWith(
PluginExecutor.class).toInstance( PluginExecutor.class).toInstance(
Executors.newCachedThreadPool()); Executors.newCachedThreadPool());

View File

@@ -19,12 +19,28 @@ import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedBatch; import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.protocol.VerificationExecutor; import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.serial.ObjectReader; import net.sf.briar.api.serial.ObjectReader;
import net.sf.briar.util.BoundedExecutor;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Provides; import com.google.inject.Provides;
public class ProtocolModule extends AbstractModule { public class ProtocolModule extends AbstractModule {
// FIXME: Determine suitable values for these constants empirically
/**
* The maximum number of verification tasks that can be queued for
* execution before submitting another task will block.
*/
private static final int MAX_QUEUED_VERIFIER_TASKS = 10;
/** The minimum number of verification threads to keep in the pool. */
private static final int MIN_VERIFIER_THREADS = 1;
/** The maximum number of verification threads. */
private static final int MAX_VERIFIER_THREADS =
Runtime.getRuntime().availableProcessors();
@Override @Override
protected void configure() { protected void configure() {
bind(AuthorFactory.class).to(AuthorFactoryImpl.class); bind(AuthorFactory.class).to(AuthorFactoryImpl.class);
@@ -34,9 +50,11 @@ public class ProtocolModule extends AbstractModule {
bind(ProtocolReaderFactory.class).to(ProtocolReaderFactoryImpl.class); bind(ProtocolReaderFactory.class).to(ProtocolReaderFactoryImpl.class);
bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class); bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class);
bind(UnverifiedBatchFactory.class).to(UnverifiedBatchFactoryImpl.class); bind(UnverifiedBatchFactory.class).to(UnverifiedBatchFactoryImpl.class);
// The executor is bounded, so tasks must be independent and short-lived
bind(Executor.class).annotatedWith( bind(Executor.class).annotatedWith(
VerificationExecutor.class).toInstance( VerificationExecutor.class).toInstance(
new VerificationExecutorImpl()); new BoundedExecutor(MAX_QUEUED_VERIFIER_TASKS,
MIN_VERIFIER_THREADS, MAX_VERIFIER_THREADS));
} }
@Provides @Provides

View File

@@ -1,54 +0,0 @@
package net.sf.briar.protocol;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* An executor that limits the number of concurrent message verification tasks
* and the number of tasks queued for execution.
*/
class VerificationExecutorImpl implements Executor {
// FIXME: Determine suitable values for these constants empirically
/**
* The maximum number of tasks that can be queued for execution
* before attempting to execute another task will block.
*/
private static final int MAX_QUEUED_TASKS = 10;
/** The number of idle threads to keep in the pool. */
private static final int MIN_THREADS = 1;
private static final Logger LOG =
Logger.getLogger(VerificationExecutorImpl.class.getName());
private final BlockingQueue<Runnable> queue;
VerificationExecutorImpl() {
this(MAX_QUEUED_TASKS, MIN_THREADS,
Runtime.getRuntime().availableProcessors());
}
VerificationExecutorImpl(int maxQueued, int minThreads, int maxThreads) {
queue = new ArrayBlockingQueue<Runnable>(maxQueued);
new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS,
queue);
}
public void execute(Runnable r) {
try {
// Block until there's space in the queue
queue.put(r);
} catch(InterruptedException e) {
if(LOG.isLoggable(Level.INFO))
LOG.info("Interrupted while queueing task");
Thread.currentThread().interrupt();
}
}
}

View File

@@ -18,7 +18,7 @@ import net.sf.briar.api.transport.BatchTransportWriter;
import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionDispatcher; import net.sf.briar.api.transport.ConnectionDispatcher;
import net.sf.briar.api.transport.ConnectionRecogniser; import net.sf.briar.api.transport.ConnectionRecogniser;
import net.sf.briar.api.transport.ConnectionRecogniserExecutor; import net.sf.briar.api.transport.IncomingConnectionExecutor;
import net.sf.briar.api.transport.StreamTransportConnection; import net.sf.briar.api.transport.StreamTransportConnection;
import net.sf.briar.api.transport.TransportConstants; import net.sf.briar.api.transport.TransportConstants;
@@ -29,24 +29,24 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(ConnectionDispatcherImpl.class.getName()); Logger.getLogger(ConnectionDispatcherImpl.class.getName());
private final Executor executor; private final Executor connExecutor;
private final ConnectionRecogniser recogniser; private final ConnectionRecogniser recogniser;
private final BatchConnectionFactory batchConnFactory; private final BatchConnectionFactory batchConnFactory;
private final StreamConnectionFactory streamConnFactory; private final StreamConnectionFactory streamConnFactory;
@Inject @Inject
ConnectionDispatcherImpl(@ConnectionRecogniserExecutor Executor executor, ConnectionDispatcherImpl(@IncomingConnectionExecutor Executor connExecutor,
ConnectionRecogniser recogniser, ConnectionRecogniser recogniser,
BatchConnectionFactory batchConnFactory, BatchConnectionFactory batchConnFactory,
StreamConnectionFactory streamConnFactory) { StreamConnectionFactory streamConnFactory) {
this.executor = executor; this.connExecutor = connExecutor;
this.recogniser = recogniser; this.recogniser = recogniser;
this.batchConnFactory = batchConnFactory; this.batchConnFactory = batchConnFactory;
this.streamConnFactory = streamConnFactory; this.streamConnFactory = streamConnFactory;
} }
public void dispatchReader(TransportId t, BatchTransportReader r) { public void dispatchReader(TransportId t, BatchTransportReader r) {
executor.execute(new DispatchBatchConnection(t, r)); connExecutor.execute(new DispatchBatchConnection(t, r));
} }
public void dispatchWriter(ContactId c, TransportId t, TransportIndex i, public void dispatchWriter(ContactId c, TransportId t, TransportIndex i,
@@ -56,7 +56,7 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
public void dispatchIncomingConnection(TransportId t, public void dispatchIncomingConnection(TransportId t,
StreamTransportConnection s) { StreamTransportConnection s) {
executor.execute(new DispatchStreamConnection(t, s)); connExecutor.execute(new DispatchStreamConnection(t, s));
} }
public void dispatchOutgoingConnection(ContactId c, TransportId t, public void dispatchOutgoingConnection(ContactId c, TransportId t,

View File

@@ -33,7 +33,7 @@ import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportIndex; import net.sf.briar.api.protocol.TransportIndex;
import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionRecogniser; import net.sf.briar.api.transport.ConnectionRecogniser;
import net.sf.briar.api.transport.ConnectionRecogniserExecutor; import net.sf.briar.api.transport.IncomingConnectionExecutor;
import net.sf.briar.api.transport.ConnectionWindow; import net.sf.briar.api.transport.ConnectionWindow;
import net.sf.briar.util.ByteUtils; import net.sf.briar.util.ByteUtils;
@@ -45,7 +45,7 @@ DatabaseListener {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(ConnectionRecogniserImpl.class.getName()); Logger.getLogger(ConnectionRecogniserImpl.class.getName());
private final Executor executor; private final Executor connExecutor;
private final DatabaseComponent db; private final DatabaseComponent db;
private final CryptoComponent crypto; private final CryptoComponent crypto;
private final Cipher tagCipher; // Locking: this private final Cipher tagCipher; // Locking: this
@@ -55,9 +55,9 @@ DatabaseListener {
private boolean initialised = false; // Locking: this private boolean initialised = false; // Locking: this
@Inject @Inject
ConnectionRecogniserImpl(@ConnectionRecogniserExecutor Executor executor, ConnectionRecogniserImpl(@IncomingConnectionExecutor Executor connExecutor,
DatabaseComponent db, CryptoComponent crypto) { DatabaseComponent db, CryptoComponent crypto) {
this.executor = executor; this.connExecutor = connExecutor;
this.db = db; this.db = db;
this.crypto = crypto; this.crypto = crypto;
tagCipher = crypto.getTagCipher(); tagCipher = crypto.getTagCipher();
@@ -154,7 +154,7 @@ DatabaseListener {
if(e instanceof ContactRemovedEvent) { if(e instanceof ContactRemovedEvent) {
// Remove the expected IVs for the ex-contact // Remove the expected IVs for the ex-contact
final ContactId c = ((ContactRemovedEvent) e).getContactId(); final ContactId c = ((ContactRemovedEvent) e).getContactId();
executor.execute(new Runnable() { connExecutor.execute(new Runnable() {
public void run() { public void run() {
removeContact(c); removeContact(c);
} }
@@ -162,7 +162,7 @@ DatabaseListener {
} else if(e instanceof TransportAddedEvent) { } else if(e instanceof TransportAddedEvent) {
// Add the expected IVs for the new transport // Add the expected IVs for the new transport
final TransportId t = ((TransportAddedEvent) e).getTransportId(); final TransportId t = ((TransportAddedEvent) e).getTransportId();
executor.execute(new Runnable() { connExecutor.execute(new Runnable() {
public void run() { public void run() {
addTransport(t); addTransport(t);
} }
@@ -172,7 +172,7 @@ DatabaseListener {
RemoteTransportsUpdatedEvent r = (RemoteTransportsUpdatedEvent) e; RemoteTransportsUpdatedEvent r = (RemoteTransportsUpdatedEvent) e;
final ContactId c = r.getContactId(); final ContactId c = r.getContactId();
final Collection<Transport> transports = r.getTransports(); final Collection<Transport> transports = r.getTransports();
executor.execute(new Runnable() { connExecutor.execute(new Runnable() {
public void run() { public void run() {
updateContact(c, transports); updateContact(c, transports);
} }

View File

@@ -7,7 +7,7 @@ import net.sf.briar.api.transport.ConnectionContextFactory;
import net.sf.briar.api.transport.ConnectionDispatcher; import net.sf.briar.api.transport.ConnectionDispatcher;
import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionRecogniser; import net.sf.briar.api.transport.ConnectionRecogniser;
import net.sf.briar.api.transport.ConnectionRecogniserExecutor; import net.sf.briar.api.transport.IncomingConnectionExecutor;
import net.sf.briar.api.transport.ConnectionRegistry; import net.sf.briar.api.transport.ConnectionRegistry;
import net.sf.briar.api.transport.ConnectionWindowFactory; import net.sf.briar.api.transport.ConnectionWindowFactory;
import net.sf.briar.api.transport.ConnectionWriterFactory; import net.sf.briar.api.transport.ConnectionWriterFactory;
@@ -29,8 +29,9 @@ public class TransportModule extends AbstractModule {
ConnectionWindowFactoryImpl.class); ConnectionWindowFactoryImpl.class);
bind(ConnectionWriterFactory.class).to( bind(ConnectionWriterFactory.class).to(
ConnectionWriterFactoryImpl.class); ConnectionWriterFactoryImpl.class);
// The executor is unbounded, so tasks can be dependent or long-lived
bind(Executor.class).annotatedWith( bind(Executor.class).annotatedWith(
ConnectionRecogniserExecutor.class).toInstance( IncomingConnectionExecutor.class).toInstance(
Executors.newCachedThreadPool()); Executors.newCachedThreadPool());
} }
} }

View File

@@ -0,0 +1,56 @@
package net.sf.briar.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* An executor that limits the number of concurrently executing tasks and the
* number of tasks queued for execution.
*/
public class BoundedExecutor implements Executor {
private static final Logger LOG =
Logger.getLogger(BoundedExecutor.class.getName());
private final Semaphore semaphore;
private final BlockingQueue<Runnable> queue;
private final Executor executor;
public BoundedExecutor(int maxQueued, int minThreads, int maxThreads) {
semaphore = new Semaphore(maxQueued + maxThreads);
queue = new LinkedBlockingQueue<Runnable>();
executor = new ThreadPoolExecutor(minThreads, maxThreads, 60,
TimeUnit.SECONDS, queue);
}
public void execute(final Runnable r) {
try {
semaphore.acquire();
executor.execute(new Runnable() {
public void run() {
try {
r.run();
} finally {
semaphore.release();
}
}
});
} catch(InterruptedException e) {
if(LOG.isLoggable(Level.INFO))
LOG.info("Interrupted while queueing task");
Thread.currentThread().interrupt();
throw new RejectedExecutionException();
} catch(RejectedExecutionException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
semaphore.release();
throw e;
}
}
}