mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-14 11:49:04 +01:00
Limit the number of validation tasks on the crypto executor.
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
package org.briarproject.bramble.sync;
|
||||
|
||||
import org.briarproject.bramble.PoliteExecutor;
|
||||
import org.briarproject.bramble.api.crypto.CryptoComponent;
|
||||
import org.briarproject.bramble.api.crypto.CryptoExecutor;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
@@ -29,6 +31,13 @@ public class SyncModule {
|
||||
ValidationManager validationManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* The maximum number of validation tasks to delegate to the crypto
|
||||
* executor concurrently.
|
||||
*/
|
||||
private static final int MAX_CONCURRENT_VALIDATION_TASKS =
|
||||
Runtime.getRuntime().availableProcessors();
|
||||
|
||||
@Provides
|
||||
GroupFactory provideGroupFactory(CryptoComponent crypto) {
|
||||
return new GroupFactoryImpl(crypto);
|
||||
@@ -62,10 +71,20 @@ public class SyncModule {
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
ValidationManager getValidationManager(LifecycleManager lifecycleManager,
|
||||
EventBus eventBus, ValidationManagerImpl validationManager) {
|
||||
ValidationManager provideValidationManager(
|
||||
LifecycleManager lifecycleManager, EventBus eventBus,
|
||||
ValidationManagerImpl validationManager) {
|
||||
lifecycleManager.registerService(validationManager);
|
||||
eventBus.addListener(validationManager);
|
||||
return validationManager;
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@ValidationExecutor
|
||||
Executor provideValidationExecutor(
|
||||
@CryptoExecutor Executor cryptoExecutor) {
|
||||
return new PoliteExecutor("ValidationExecutor", cryptoExecutor,
|
||||
MAX_CONCURRENT_VALIDATION_TASKS);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package org.briarproject.bramble.sync;
|
||||
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
import javax.inject.Qualifier;
|
||||
|
||||
import static java.lang.annotation.ElementType.FIELD;
|
||||
import static java.lang.annotation.ElementType.METHOD;
|
||||
import static java.lang.annotation.ElementType.PARAMETER;
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
|
||||
/**
|
||||
* Annotation for injecting the executor for validation tasks. Also used for
|
||||
* annotating methods that should run on the validation executor.
|
||||
* <p>
|
||||
* The contract of this executor is that tasks may be run concurrently, and
|
||||
* submitting a task will never block. Tasks must not run indefinitely. Tasks
|
||||
* submitted during shutdown are discarded.
|
||||
*/
|
||||
@Qualifier
|
||||
@Target({FIELD, METHOD, PARAMETER})
|
||||
@Retention(RUNTIME)
|
||||
@interface ValidationExecutor {
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package org.briarproject.bramble.sync;
|
||||
|
||||
import org.briarproject.bramble.api.crypto.CryptoExecutor;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
@@ -50,8 +49,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
Logger.getLogger(ValidationManagerImpl.class.getName());
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor;
|
||||
private final Executor cryptoExecutor;
|
||||
private final Executor dbExecutor, validationExecutor;
|
||||
private final MessageFactory messageFactory;
|
||||
private final Map<ClientId, MessageValidator> validators;
|
||||
private final Map<ClientId, IncomingMessageHook> hooks;
|
||||
@@ -60,11 +58,11 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
@Inject
|
||||
ValidationManagerImpl(DatabaseComponent db,
|
||||
@DatabaseExecutor Executor dbExecutor,
|
||||
@CryptoExecutor Executor cryptoExecutor,
|
||||
@ValidationExecutor Executor validationExecutor,
|
||||
MessageFactory messageFactory) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.cryptoExecutor = cryptoExecutor;
|
||||
this.validationExecutor = validationExecutor;
|
||||
this.messageFactory = messageFactory;
|
||||
validators = new ConcurrentHashMap<ClientId, MessageValidator>();
|
||||
hooks = new ConcurrentHashMap<ClientId, IncomingMessageHook>();
|
||||
@@ -104,6 +102,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void validateOutstandingMessages(ClientId c) {
|
||||
try {
|
||||
Queue<MessageId> unvalidated = new LinkedList<MessageId>();
|
||||
@@ -130,6 +129,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void validateNextMessage(Queue<MessageId> unvalidated) {
|
||||
try {
|
||||
Message m;
|
||||
@@ -167,6 +167,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void deliverOutstandingMessages(ClientId c) {
|
||||
try {
|
||||
Queue<MessageId> pending = new LinkedList<MessageId>();
|
||||
@@ -194,6 +195,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void deliverNextPendingMessage(Queue<MessageId> pending) {
|
||||
try {
|
||||
boolean anyInvalid = false, allDelivered = true;
|
||||
@@ -220,8 +222,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
Message m = messageFactory.createMessage(id, raw);
|
||||
Group g = db.getGroup(txn, m.getGroupId());
|
||||
ClientId c = g.getClientId();
|
||||
Metadata meta = db.getMessageMetadataForValidator(txn,
|
||||
id);
|
||||
Metadata meta =
|
||||
db.getMessageMetadataForValidator(txn, id);
|
||||
DeliveryResult result = deliverMessage(txn, m, c, meta);
|
||||
if (result.valid) {
|
||||
pending.addAll(getPendingDependents(txn, id));
|
||||
@@ -240,8 +242,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
db.endTransaction(txn);
|
||||
}
|
||||
if (invalidate != null) invalidateNextMessageAsync(invalidate);
|
||||
deliverNextPendingMessageAsync(pending);
|
||||
if (toShare != null) shareNextMessageAsync(toShare);
|
||||
deliverNextPendingMessageAsync(pending);
|
||||
} catch (NoSuchMessageException e) {
|
||||
LOG.info("Message removed before delivery");
|
||||
deliverNextPendingMessageAsync(pending);
|
||||
@@ -249,13 +251,12 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
LOG.info("Group removed before delivery");
|
||||
deliverNextPendingMessageAsync(pending);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void validateMessageAsync(final Message m, final Group g) {
|
||||
cryptoExecutor.execute(new Runnable() {
|
||||
validationExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
validateMessage(m, g);
|
||||
@@ -263,10 +264,12 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@ValidationExecutor
|
||||
private void validateMessage(Message m, Group g) {
|
||||
MessageValidator v = validators.get(g.getClientId());
|
||||
if (v == null) {
|
||||
LOG.warning("No validator");
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.warning("No validator for " + g.getClientId().getString());
|
||||
} else {
|
||||
try {
|
||||
MessageContext context = v.validateMessage(m, g);
|
||||
@@ -291,6 +294,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void storeMessageContext(Message m, ClientId c,
|
||||
MessageContext context) {
|
||||
try {
|
||||
@@ -353,6 +357,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
}
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private DeliveryResult deliverMessage(Transaction txn, Message m,
|
||||
ClientId c, Metadata meta) throws DbException {
|
||||
// Deliver the message to the client if it's registered a hook
|
||||
@@ -362,10 +367,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
try {
|
||||
shareMsg = hook.incomingMessage(txn, m, meta);
|
||||
} catch (InvalidMessageException e) {
|
||||
// message is invalid, mark it as such and delete it
|
||||
db.setMessageState(txn, m.getId(), INVALID);
|
||||
db.deleteMessageMetadata(txn, m.getId());
|
||||
db.deleteMessage(txn, m.getId());
|
||||
invalidateMessage(txn, m.getId());
|
||||
return new DeliveryResult(false, false);
|
||||
}
|
||||
}
|
||||
@@ -373,6 +375,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
return new DeliveryResult(true, shareMsg);
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private Queue<MessageId> getPendingDependents(Transaction txn, MessageId m)
|
||||
throws DbException {
|
||||
Queue<MessageId> pending = new LinkedList<MessageId>();
|
||||
@@ -392,6 +395,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void shareOutstandingMessages(ClientId c) {
|
||||
try {
|
||||
Queue<MessageId> toShare = new LinkedList<MessageId>();
|
||||
@@ -424,6 +428,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void shareNextMessage(Queue<MessageId> toShare) {
|
||||
try {
|
||||
Transaction txn = db.startTransaction(false);
|
||||
@@ -457,6 +462,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void invalidateNextMessage(Queue<MessageId> invalidate) {
|
||||
try {
|
||||
Transaction txn = db.startTransaction(false);
|
||||
@@ -479,6 +485,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
}
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void invalidateMessage(Transaction txn, MessageId m)
|
||||
throws DbException {
|
||||
db.setMessageState(txn, m, INVALID);
|
||||
@@ -486,6 +493,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
db.deleteMessageMetadata(txn, m);
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private Queue<MessageId> getDependentsToInvalidate(Transaction txn,
|
||||
MessageId m) throws DbException {
|
||||
Queue<MessageId> invalidate = new LinkedList<MessageId>();
|
||||
@@ -515,6 +523,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
});
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void loadGroupAndValidate(final Message m) {
|
||||
try {
|
||||
Group g;
|
||||
@@ -534,6 +543,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
||||
}
|
||||
|
||||
private static class DeliveryResult {
|
||||
|
||||
private final boolean valid, share;
|
||||
|
||||
private DeliveryResult(boolean valid, boolean share) {
|
||||
|
||||
Reference in New Issue
Block a user