Moved message verification into a separate thread pool.

This commit is contained in:
akwizgran
2011-12-07 21:33:14 +00:00
parent 22dfe947fa
commit e2cb1027af
11 changed files with 161 additions and 37 deletions

View File

@@ -0,0 +1,15 @@
package net.sf.briar.api.protocol;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import com.google.inject.BindingAnnotation;
/** Annotation for injecting the executor for message verification tasks. */
@BindingAnnotation
@Target({ PARAMETER })
@Retention(RUNTIME)
public @interface VerificationExecutor {}

View File

@@ -6,6 +6,10 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* An executor that limits the number of concurrent database tasks and the
* number of tasks queued for execution.
*/
class DatabaseExecutorImpl implements Executor {
// FIXME: Determine suitable values for these constants empirically
@@ -28,14 +32,15 @@ class DatabaseExecutorImpl implements Executor {
this(MAX_QUEUED_TASKS, MIN_THREADS, MAX_THREADS);
}
DatabaseExecutorImpl(int maxQueuedTasks, int minThreads, int maxThreads) {
queue = new ArrayBlockingQueue<Runnable>(maxQueuedTasks);
DatabaseExecutorImpl(int maxQueued, int minThreads, int maxThreads) {
queue = new ArrayBlockingQueue<Runnable>(maxQueued);
new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS,
queue);
}
public void execute(Runnable r) {
try {
// Block until there's space in the queue
queue.put(r);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();

View File

@@ -1,5 +1,7 @@
package net.sf.briar.protocol;
import java.util.concurrent.Executor;
import net.sf.briar.api.crypto.CryptoComponent;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Author;
@@ -15,10 +17,12 @@ import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.serial.ObjectReader;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
public class ProtocolModule extends AbstractModule {
@@ -31,6 +35,8 @@ public class ProtocolModule extends AbstractModule {
bind(ProtocolReaderFactory.class).to(ProtocolReaderFactoryImpl.class);
bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class);
bind(UnverifiedBatchFactory.class).to(UnverifiedBatchFactoryImpl.class);
bind(Executor.class).annotatedWith(VerificationExecutor.class).to(
VerificationExecutorImpl.class).in(Singleton.class);
}
@Provides

View File

@@ -0,0 +1,47 @@
package net.sf.briar.protocol;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* An executor that limits the number of concurrent message verification tasks
* and the number of tasks queued for execution.
*/
class VerificationExecutorImpl implements Executor {
// FIXME: Determine suitable values for these constants empirically
/**
* The maximum number of tasks that can be queued for execution
* before attempting to execute another task will block.
*/
private static final int MAX_QUEUED_TASKS = 10;
/** The number of idle threads to keep in the pool. */
private static final int MIN_THREADS = 1;
private final BlockingQueue<Runnable> queue;
VerificationExecutorImpl() {
this(MAX_QUEUED_TASKS, MIN_THREADS,
Runtime.getRuntime().availableProcessors());
}
VerificationExecutorImpl(int maxQueued, int minThreads, int maxThreads) {
queue = new ArrayBlockingQueue<Runnable>(maxQueued);
new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS,
queue);
}
public void execute(Runnable r) {
try {
// Block until there's space in the queue
queue.put(r);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@@ -4,9 +4,11 @@ import java.util.concurrent.Executor;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.TransportIndex;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.transport.BatchConnectionFactory;
import net.sf.briar.api.transport.BatchTransportReader;
import net.sf.briar.api.transport.BatchTransportWriter;
@@ -18,7 +20,7 @@ import com.google.inject.Inject;
class BatchConnectionFactoryImpl implements BatchConnectionFactory {
private final Executor executor;
private final Executor dbExecutor, verificationExecutor;
private final DatabaseComponent db;
private final ConnectionReaderFactory connReaderFactory;
private final ConnectionWriterFactory connWriterFactory;
@@ -26,12 +28,14 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
private final ProtocolWriterFactory protoWriterFactory;
@Inject
BatchConnectionFactoryImpl(Executor executor, DatabaseComponent db,
ConnectionReaderFactory connReaderFactory,
BatchConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory) {
this.executor = executor;
this.dbExecutor = dbExecutor;
this.verificationExecutor = verificationExecutor;
this.db = db;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
@@ -42,8 +46,8 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
public void createIncomingConnection(ConnectionContext ctx,
BatchTransportReader r, byte[] tag) {
final IncomingBatchConnection conn = new IncomingBatchConnection(
executor, db, connReaderFactory, protoReaderFactory, ctx, r,
tag);
dbExecutor, verificationExecutor, db, connReaderFactory,
protoReaderFactory, ctx, r, tag);
Runnable read = new Runnable() {
public void run() {
conn.read();

View File

@@ -13,11 +13,13 @@ import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.ProtocolReader;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.transport.BatchTransportReader;
import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionReader;
@@ -28,7 +30,7 @@ class IncomingBatchConnection {
private static final Logger LOG =
Logger.getLogger(IncomingBatchConnection.class.getName());
private final Executor dbExecutor;
private final Executor dbExecutor, verificationExecutor;
private final ConnectionReaderFactory connFactory;
private final DatabaseComponent db;
private final ProtocolReaderFactory protoFactory;
@@ -38,10 +40,12 @@ class IncomingBatchConnection {
private final ContactId contactId;
IncomingBatchConnection(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionReaderFactory connFactory,
ProtocolReaderFactory protoFactory, ConnectionContext ctx,
BatchTransportReader transport, byte[] tag) {
this.dbExecutor = dbExecutor;
this.verificationExecutor = verificationExecutor;
this.connFactory = connFactory;
this.db = db;
this.protoFactory = protoFactory;
@@ -64,7 +68,7 @@ class IncomingBatchConnection {
dbExecutor.execute(new ReceiveAck(a));
} else if(reader.hasBatch()) {
UnverifiedBatch b = reader.readBatch();
dbExecutor.execute(new ReceiveBatch(b));
verificationExecutor.execute(new VerifyBatch(b));
} else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate s = reader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(s));
@@ -99,26 +103,41 @@ class IncomingBatchConnection {
}
}
private class ReceiveBatch implements Runnable {
private class VerifyBatch implements Runnable {
private final UnverifiedBatch batch;
private ReceiveBatch(UnverifiedBatch batch) {
private VerifyBatch(UnverifiedBatch batch) {
this.batch = batch;
}
public void run() {
try {
// FIXME: Don't verify on the DB thread
db.receiveBatch(contactId, batch.verify());
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
Batch b = batch.verify();
dbExecutor.execute(new ReceiveBatch(b));
} catch(GeneralSecurityException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
}
}
}
private class ReceiveBatch implements Runnable {
private final Batch batch;
private ReceiveBatch(Batch batch) {
this.batch = batch;
}
public void run() {
try {
db.receiveBatch(contactId, batch);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
}
}
}
private class ReceiveSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;

View File

@@ -7,6 +7,7 @@ import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionReader;
import net.sf.briar.api.transport.ConnectionReaderFactory;
@@ -20,15 +21,16 @@ class IncomingStreamConnection extends StreamConnection {
private final byte[] tag;
IncomingStreamConnection(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory,
ConnectionContext ctx, StreamTransportConnection connection,
byte[] tag) {
super(dbExecutor, db, connReaderFactory, connWriterFactory,
protoReaderFactory, protoWriterFactory, ctx.getContactId(),
connection);
super(dbExecutor, verificationExecutor, db, connReaderFactory,
connWriterFactory, protoReaderFactory, protoWriterFactory,
ctx.getContactId(), connection);
this.ctx = ctx;
this.tag = tag;
}

View File

@@ -10,6 +10,7 @@ import net.sf.briar.api.db.DbException;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.TransportIndex;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionReader;
import net.sf.briar.api.transport.ConnectionReaderFactory;
@@ -24,14 +25,16 @@ class OutgoingStreamConnection extends StreamConnection {
private ConnectionContext ctx = null; // Locking: this
OutgoingStreamConnection(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
TransportIndex transportIndex,
StreamTransportConnection connection) {
super(dbExecutor, db, connReaderFactory, connWriterFactory,
protoReaderFactory, protoWriterFactory, contactId, connection);
super(dbExecutor, verificationExecutor, db, connReaderFactory,
connWriterFactory, protoReaderFactory, protoWriterFactory,
contactId, connection);
this.transportIndex = transportIndex;
}

View File

@@ -28,6 +28,7 @@ import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent;
import net.sf.briar.api.db.event.MessagesAddedEvent;
import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolReader;
@@ -39,6 +40,7 @@ import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.transport.ConnectionReader;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionWriter;
@@ -50,7 +52,6 @@ abstract class StreamConnection implements DatabaseListener {
private static final Logger LOG =
Logger.getLogger(StreamConnection.class.getName());
protected final Executor dbExecutor;
protected final DatabaseComponent db;
protected final ConnectionReaderFactory connReaderFactory;
protected final ConnectionWriterFactory connWriterFactory;
@@ -59,6 +60,7 @@ abstract class StreamConnection implements DatabaseListener {
protected final ContactId contactId;
protected final StreamTransportConnection transport;
private final Executor dbExecutor, verificationExecutor;
private final AtomicBoolean canSendOffer;
private final LinkedList<Runnable> writerTasks; // Locking: this
@@ -68,12 +70,14 @@ abstract class StreamConnection implements DatabaseListener {
private volatile boolean closed = false;
StreamConnection(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
StreamTransportConnection transport) {
this.dbExecutor = dbExecutor;
this.verificationExecutor = verificationExecutor;
this.db = db;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
@@ -121,7 +125,7 @@ abstract class StreamConnection implements DatabaseListener {
dbExecutor.execute(new ReceiveAck(a));
} else if(reader.hasBatch()) {
UnverifiedBatch b = reader.readBatch();
dbExecutor.execute(new ReceiveBatch(b));
verificationExecutor.execute(new VerifyBatch(b));
} else if(reader.hasOffer()) {
Offer o = reader.readOffer();
dbExecutor.execute(new ReceiveOffer(o));
@@ -232,27 +236,43 @@ abstract class StreamConnection implements DatabaseListener {
}
}
// This task runs on a database thread
private class ReceiveBatch implements Runnable {
// This task runs on a verification thread
private class VerifyBatch implements Runnable {
private final UnverifiedBatch batch;
private ReceiveBatch(UnverifiedBatch batch) {
private VerifyBatch(UnverifiedBatch batch) {
this.batch = batch;
}
public void run() {
try {
// FIXME: Don't verify on the DB thread
db.receiveBatch(contactId, batch.verify());
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
Batch b = batch.verify();
dbExecutor.execute(new ReceiveBatch(b));
} catch(GeneralSecurityException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
}
}
}
// This task runs on a database thread
private class ReceiveBatch implements Runnable {
private final Batch batch;
private ReceiveBatch(Batch batch) {
this.batch = batch;
}
public void run() {
try {
db.receiveBatch(contactId, batch);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
}
}
}
// This task runs on a database thread
private class ReceiveOffer implements Runnable {

View File

@@ -8,6 +8,7 @@ import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.TransportIndex;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionWriterFactory;
@@ -18,7 +19,7 @@ import com.google.inject.Inject;
class StreamConnectionFactoryImpl implements StreamConnectionFactory {
private final Executor dbExecutor;
private final Executor dbExecutor, verificationExecutor;
private final DatabaseComponent db;
private final ConnectionReaderFactory connReaderFactory;
private final ConnectionWriterFactory connWriterFactory;
@@ -27,11 +28,13 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
@Inject
StreamConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory) {
this.dbExecutor = dbExecutor;
this.verificationExecutor = verificationExecutor;
this.db = db;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
@@ -42,8 +45,8 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
public void createIncomingConnection(ConnectionContext ctx,
StreamTransportConnection s, byte[] tag) {
final StreamConnection conn = new IncomingStreamConnection(dbExecutor,
db, connReaderFactory, connWriterFactory, protoReaderFactory,
protoWriterFactory, ctx, s, tag);
verificationExecutor, db, connReaderFactory, connWriterFactory,
protoReaderFactory, protoWriterFactory, ctx, s, tag);
Runnable write = new Runnable() {
public void run() {
conn.write();
@@ -61,8 +64,8 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
public void createOutgoingConnection(ContactId c, TransportIndex i,
StreamTransportConnection s) {
final StreamConnection conn = new OutgoingStreamConnection(dbExecutor,
db, connReaderFactory, connWriterFactory, protoReaderFactory,
protoWriterFactory, c, i, s);
verificationExecutor, db, connReaderFactory, connWriterFactory,
protoReaderFactory, protoWriterFactory, c, i, s);
Runnable write = new Runnable() {
public void run() {
conn.write();

View File

@@ -186,8 +186,8 @@ public class BatchConnectionReadWriteTest extends TestCase {
bob.getInstance(ProtocolReaderFactory.class);
BatchTransportReader reader = new TestBatchTransportReader(in);
IncomingBatchConnection batchIn = new IncomingBatchConnection(
new ImmediateExecutor(), db, connFactory, protoFactory, ctx,
reader, tag);
new ImmediateExecutor(), new ImmediateExecutor(), db,
connFactory, protoFactory, ctx, reader, tag);
// No messages should have been added yet
assertFalse(listener.messagesAdded);
// Read whatever needs to be read