From 4123f4a5ce9ad8441991b218c09152ce43bf2317 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 12 Jan 2017 12:16:13 +0000 Subject: [PATCH 1/5] Log time spent queueing and executing crypto and DB tasks. --- .../bramble/TimeLoggingExecutor.java | 47 +++++++++++++++++++ .../bramble/crypto/CryptoModule.java | 5 +- .../bramble/db/DatabaseComponentImpl.java | 7 +++ .../bramble/db/DatabaseExecutorModule.java | 5 +- 4 files changed, 60 insertions(+), 4 deletions(-) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java diff --git a/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java b/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java new file mode 100644 index 000000000..77823e5d0 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java @@ -0,0 +1,47 @@ +package org.briarproject.bramble; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static java.util.logging.Level.FINE; + +public class TimeLoggingExecutor extends ThreadPoolExecutor { + + private static final Level LOG_LEVEL = FINE; + + private final Logger log; + + public TimeLoggingExecutor(String tag, int corePoolSize, int maxPoolSize, + long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler) { + super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, + handler); + log = Logger.getLogger(tag); + } + + @Override + public void execute(final Runnable r) { + final long submitted = System.currentTimeMillis(); + super.execute(new Runnable() { + @Override + public void run() { + long started = System.currentTimeMillis(); + if (log.isLoggable(LOG_LEVEL)) { + long duration = started - submitted; + log.log(LOG_LEVEL, "Queue time " + duration + " ms"); + } + r.run(); + long finished = System.currentTimeMillis(); + if (log.isLoggable(LOG_LEVEL)) { + long duration = finished - started; + log.log(LOG_LEVEL, "Execution time " + duration + " ms"); + } + } + }); + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java index 6b3ea5295..1ac3cc29c 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java @@ -1,5 +1,6 @@ package org.briarproject.bramble.crypto; +import org.briarproject.bramble.TimeLoggingExecutor; import org.briarproject.bramble.api.crypto.CryptoComponent; import org.briarproject.bramble.api.crypto.CryptoExecutor; import org.briarproject.bramble.api.crypto.PasswordStrengthEstimator; @@ -49,8 +50,8 @@ public class CryptoModule { RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); // Create a limited # of threads and keep them in the pool for 60 secs - cryptoExecutor = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS, - 60, SECONDS, queue, policy); + cryptoExecutor = new TimeLoggingExecutor("CryptoExecutor", 0, + MAX_EXECUTOR_THREADS, 60, SECONDS, queue, policy); } @Provides diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 8d9be1709..dd8dea976 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -67,6 +67,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; +import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; @@ -130,8 +131,14 @@ class DatabaseComponentImpl implements DatabaseComponent { // Don't allow reentrant locking if (lock.getReadHoldCount() > 0) throw new IllegalStateException(); if (lock.getWriteHoldCount() > 0) throw new IllegalStateException(); + long start = System.currentTimeMillis(); if (readOnly) lock.readLock().lock(); else lock.writeLock().lock(); + if (LOG.isLoggable(FINE)) { + long duration = System.currentTimeMillis() - start; + if (readOnly) LOG.fine("Waited " + duration + " ms for read lock"); + else LOG.fine("Waited " + duration + " ms for write lock"); + } try { return new Transaction(db.startTransaction(), readOnly); } catch (DbException e) { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseExecutorModule.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseExecutorModule.java index 7865faf36..4c11098ff 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseExecutorModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseExecutorModule.java @@ -1,5 +1,6 @@ package org.briarproject.bramble.db; +import org.briarproject.bramble.TimeLoggingExecutor; import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.lifecycle.LifecycleManager; @@ -36,8 +37,8 @@ public class DatabaseExecutorModule { RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); // Use a single thread and keep it in the pool for 60 secs - databaseExecutor = new ThreadPoolExecutor(0, 1, 60, SECONDS, queue, - policy); + databaseExecutor = new TimeLoggingExecutor("DatabaseExecutor", 0, 1, + 60, SECONDS, queue, policy); } @Provides From 0c085f139ac40cd7e81314ed944f05da6ca5ec7e Mon Sep 17 00:00:00 2001 From: akwizgran Date: Mon, 16 Jan 2017 16:11:29 +0000 Subject: [PATCH 2/5] Added "polite" delegating executor. --- .../briarproject/bramble/PoliteExecutor.java | 84 +++++++++++ .../bramble/TimeLoggingExecutor.java | 34 +++-- .../bramble/crypto/CryptoModule.java | 11 +- .../bramble/PoliteExecutorTest.java | 142 ++++++++++++++++++ 4 files changed, 253 insertions(+), 18 deletions(-) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/PoliteExecutor.java create mode 100644 bramble-core/src/test/java/org/briarproject/bramble/PoliteExecutorTest.java diff --git a/bramble-core/src/main/java/org/briarproject/bramble/PoliteExecutor.java b/bramble-core/src/main/java/org/briarproject/bramble/PoliteExecutor.java new file mode 100644 index 000000000..02f88c7c1 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/PoliteExecutor.java @@ -0,0 +1,84 @@ +package org.briarproject.bramble; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.concurrent.GuardedBy; + +import static java.util.logging.Level.FINE; + +/** + * An {@link Executor} that delegates its tasks to another {@link Executor} + * while limiting the number of tasks that are delegated concurrently. Tasks + * are delegated in the order they are submitted to this executor. + */ +@NotNullByDefault +public class PoliteExecutor implements Executor { + + private static final Level LOG_LEVEL = FINE; + + private final Object lock = new Object(); + @GuardedBy("lock") + private final Queue queue = new LinkedList(); + private final Executor delegate; + private final int maxConcurrentTasks; + private final Logger log; + + @GuardedBy("lock") + private int concurrentTasks = 0; + + /** + * @param tag the tag to be used for logging + * @param delegate the executor to which tasks will be delegated + * @param maxConcurrentTasks the maximum number of tasks that will be + * delegated concurrently. If this is set to 1, tasks submitted to this + * executor will run in the order they are submitted and will not run + * concurrently + */ + public PoliteExecutor(String tag, Executor delegate, + int maxConcurrentTasks) { + this.delegate = delegate; + this.maxConcurrentTasks = maxConcurrentTasks; + log = Logger.getLogger(tag); + } + + @Override + public void execute(final Runnable r) { + final long submitted = System.currentTimeMillis(); + Runnable wrapped = new Runnable() { + @Override + public void run() { + if (log.isLoggable(LOG_LEVEL)) { + long queued = System.currentTimeMillis() - submitted; + log.log(LOG_LEVEL, "Queue time " + queued + " ms"); + } + try { + r.run(); + } finally { + scheduleNext(); + } + } + }; + synchronized (lock) { + if (concurrentTasks < maxConcurrentTasks) { + concurrentTasks++; + delegate.execute(wrapped); + } else { + queue.add(wrapped); + } + } + } + + private void scheduleNext() { + synchronized (lock) { + Runnable next = queue.poll(); + if (next == null) concurrentTasks--; + else delegate.execute(next); + } + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java b/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java index 77823e5d0..e7385f59b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java @@ -1,5 +1,7 @@ package org.briarproject.bramble; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; @@ -9,6 +11,7 @@ import java.util.logging.Logger; import static java.util.logging.Level.FINE; +@NotNullByDefault public class TimeLoggingExecutor extends ThreadPoolExecutor { private static final Level LOG_LEVEL = FINE; @@ -26,22 +29,21 @@ public class TimeLoggingExecutor extends ThreadPoolExecutor { @Override public void execute(final Runnable r) { - final long submitted = System.currentTimeMillis(); - super.execute(new Runnable() { - @Override - public void run() { - long started = System.currentTimeMillis(); - if (log.isLoggable(LOG_LEVEL)) { - long duration = started - submitted; - log.log(LOG_LEVEL, "Queue time " + duration + " ms"); + if (log.isLoggable(LOG_LEVEL)) { + final long submitted = System.currentTimeMillis(); + super.execute(new Runnable() { + @Override + public void run() { + long started = System.currentTimeMillis(); + long queued = started - submitted; + log.log(LOG_LEVEL, "Queue time " + queued + " ms"); + r.run(); + long executing = System.currentTimeMillis() - started; + log.log(LOG_LEVEL, "Execution time " + executing + " ms"); } - r.run(); - long finished = System.currentTimeMillis(); - if (log.isLoggable(LOG_LEVEL)) { - long duration = finished - started; - log.log(LOG_LEVEL, "Execution time " + duration + " ms"); - } - } - }); + }); + } else { + super.execute(r); + } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java index 1ac3cc29c..bcad98cda 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java @@ -32,7 +32,7 @@ public class CryptoModule { public static class EagerSingletons { @Inject @CryptoExecutor - Executor cryptoExecutor; + ExecutorService cryptoExecutor; } /** @@ -86,11 +86,18 @@ public class CryptoModule { @Provides @Singleton @CryptoExecutor - Executor getCryptoExecutor(LifecycleManager lifecycleManager) { + ExecutorService getCryptoExecutorService( + LifecycleManager lifecycleManager) { lifecycleManager.registerForShutdown(cryptoExecutor); return cryptoExecutor; } + @Provides + @CryptoExecutor + Executor getCryptoExecutor() { + return cryptoExecutor; + } + @Provides SecureRandom getSecureRandom(CryptoComponent crypto) { return crypto.getSecureRandom(); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/PoliteExecutorTest.java b/bramble-core/src/test/java/org/briarproject/bramble/PoliteExecutorTest.java new file mode 100644 index 000000000..426c4cc1d --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/PoliteExecutorTest.java @@ -0,0 +1,142 @@ +package org.briarproject.bramble; + +import org.briarproject.bramble.test.BrambleTestCase; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class PoliteExecutorTest extends BrambleTestCase { + + private static final String TAG = "Test"; + private static final int TASKS = 10; + + @Test + public void testTasksAreDelegatedInOrderOfSubmission() throws Exception { + // Delegate to a single-threaded executor + Executor delegate = Executors.newSingleThreadExecutor(); + // Allow all the tasks to be delegated straight away + PoliteExecutor polite = new PoliteExecutor(TAG, delegate, TASKS * 2); + final List list = new Vector(); + final CountDownLatch latch = new CountDownLatch(TASKS); + for (int i = 0; i < TASKS; i++) { + final int result = i; + polite.execute(new Runnable() { + @Override + public void run() { + list.add(result); + latch.countDown(); + } + }); + } + // Wait for all the tasks to finish + latch.await(); + // The tasks should have run in the order they were submitted + assertEquals(ascendingOrder(), list); + } + + @Test + public void testQueuedTasksAreDelegatedInOrderOfSubmission() + throws Exception { + // Delegate to a single-threaded executor + Executor delegate = Executors.newSingleThreadExecutor(); + // Allow two tasks to be delegated at a time + PoliteExecutor polite = new PoliteExecutor(TAG, delegate, 2); + final List list = new Vector(); + final CountDownLatch latch = new CountDownLatch(TASKS); + for (int i = 0; i < TASKS; i++) { + final int result = i; + polite.execute(new Runnable() { + @Override + public void run() { + list.add(result); + latch.countDown(); + } + }); + } + // Wait for all the tasks to finish + latch.await(); + // The tasks should have run in the order they were submitted + assertEquals(ascendingOrder(), list); + } + + @Test + public void testTasksRunInParallelOnDelegate() throws Exception { + // Delegate to a multi-threaded executor + Executor delegate = Executors.newCachedThreadPool(); + // Allow all the tasks to be delegated straight away + PoliteExecutor polite = new PoliteExecutor(TAG, delegate, TASKS * 2); + final List list = new Vector(); + final CountDownLatch[] latches = new CountDownLatch[TASKS]; + for (int i = 0; i < TASKS; i++) latches[i] = new CountDownLatch(1); + for (int i = 0; i < TASKS; i++) { + final int result = i; + polite.execute(new Runnable() { + @Override + public void run() { + try { + // Each task waits for the next task, if any, to finish + if (result < TASKS - 1) latches[result + 1].await(); + list.add(result); + } catch (InterruptedException e) { + fail(); + } + latches[result].countDown(); + } + }); + } + // Wait for all the tasks to finish + for (int i = 0; i < TASKS; i++) latches[i].await(); + // The tasks should have finished in reverse order + assertEquals(descendingOrder(), list); + } + + @Test + public void testTasksDoNotRunInParallelOnDelegate() throws Exception { + // Delegate to a multi-threaded executor + Executor delegate = Executors.newCachedThreadPool(); + // Allow one task to be delegated at a time + PoliteExecutor polite = new PoliteExecutor(TAG, delegate, 1); + final List list = new Vector(); + final CountDownLatch latch = new CountDownLatch(TASKS); + for (int i = 0; i < TASKS; i++) { + final int result = i; + polite.execute(new Runnable() { + @Override + public void run() { + try { + // Each task runs faster than the previous task + Thread.sleep(TASKS - result); + list.add(result); + } catch (InterruptedException e) { + fail(); + } + latch.countDown(); + } + }); + } + // Wait for all the tasks to finish + latch.await(); + // The tasks should have finished in the order they were submitted + assertEquals(ascendingOrder(), list); + } + + private List ascendingOrder() { + Integer[] array = new Integer[TASKS]; + for (int i = 0; i < TASKS; i++) array[i] = i; + return Arrays.asList(array); + } + + private List descendingOrder() { + Integer[] array = new Integer[TASKS]; + for (int i = 0; i < TASKS; i++) array[i] = TASKS - 1 - i; + return Arrays.asList(array); + } +} From d381e25e86b457eb9b185ab2e1714a1f2da4ce09 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Wed, 5 Apr 2017 14:03:40 +0100 Subject: [PATCH 3/5] Limit the number of validation tasks on the crypto executor. --- .../briarproject/bramble/sync/SyncModule.java | 23 +- .../bramble/sync/ValidationExecutor.java | 25 ++ .../bramble/sync/ValidationManagerImpl.java | 42 ++- .../sync/ValidationManagerImplTest.java | 357 ++++-------------- 4 files changed, 140 insertions(+), 307 deletions(-) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/sync/ValidationExecutor.java diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java index 09aaa199b..d4cc88881 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java @@ -1,6 +1,8 @@ package org.briarproject.bramble.sync; +import org.briarproject.bramble.PoliteExecutor; import org.briarproject.bramble.api.crypto.CryptoComponent; +import org.briarproject.bramble.api.crypto.CryptoExecutor; import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.event.EventBus; @@ -29,6 +31,13 @@ public class SyncModule { ValidationManager validationManager; } + /** + * The maximum number of validation tasks to delegate to the crypto + * executor concurrently. + */ + private static final int MAX_CONCURRENT_VALIDATION_TASKS = + Runtime.getRuntime().availableProcessors(); + @Provides GroupFactory provideGroupFactory(CryptoComponent crypto) { return new GroupFactoryImpl(crypto); @@ -62,10 +71,20 @@ public class SyncModule { @Provides @Singleton - ValidationManager getValidationManager(LifecycleManager lifecycleManager, - EventBus eventBus, ValidationManagerImpl validationManager) { + ValidationManager provideValidationManager( + LifecycleManager lifecycleManager, EventBus eventBus, + ValidationManagerImpl validationManager) { lifecycleManager.registerService(validationManager); eventBus.addListener(validationManager); return validationManager; } + + @Provides + @Singleton + @ValidationExecutor + Executor provideValidationExecutor( + @CryptoExecutor Executor cryptoExecutor) { + return new PoliteExecutor("ValidationExecutor", cryptoExecutor, + MAX_CONCURRENT_VALIDATION_TASKS); + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/ValidationExecutor.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/ValidationExecutor.java new file mode 100644 index 000000000..41cda1d35 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/ValidationExecutor.java @@ -0,0 +1,25 @@ +package org.briarproject.bramble.sync; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import javax.inject.Qualifier; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * Annotation for injecting the executor for validation tasks. Also used for + * annotating methods that should run on the validation executor. + *

+ * The contract of this executor is that tasks may be run concurrently, and + * submitting a task will never block. Tasks must not run indefinitely. Tasks + * submitted during shutdown are discarded. + */ +@Qualifier +@Target({FIELD, METHOD, PARAMETER}) +@Retention(RUNTIME) +@interface ValidationExecutor { +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/ValidationManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/ValidationManagerImpl.java index 7e7aa865f..888890864 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/ValidationManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/ValidationManagerImpl.java @@ -1,6 +1,5 @@ package org.briarproject.bramble.sync; -import org.briarproject.bramble.api.crypto.CryptoExecutor; import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.db.DbException; @@ -50,8 +49,7 @@ class ValidationManagerImpl implements ValidationManager, Service, Logger.getLogger(ValidationManagerImpl.class.getName()); private final DatabaseComponent db; - private final Executor dbExecutor; - private final Executor cryptoExecutor; + private final Executor dbExecutor, validationExecutor; private final MessageFactory messageFactory; private final Map validators; private final Map hooks; @@ -60,11 +58,11 @@ class ValidationManagerImpl implements ValidationManager, Service, @Inject ValidationManagerImpl(DatabaseComponent db, @DatabaseExecutor Executor dbExecutor, - @CryptoExecutor Executor cryptoExecutor, + @ValidationExecutor Executor validationExecutor, MessageFactory messageFactory) { this.db = db; this.dbExecutor = dbExecutor; - this.cryptoExecutor = cryptoExecutor; + this.validationExecutor = validationExecutor; this.messageFactory = messageFactory; validators = new ConcurrentHashMap(); hooks = new ConcurrentHashMap(); @@ -104,6 +102,7 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @DatabaseExecutor private void validateOutstandingMessages(ClientId c) { try { Queue unvalidated = new LinkedList(); @@ -130,6 +129,7 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @DatabaseExecutor private void validateNextMessage(Queue unvalidated) { try { Message m; @@ -167,6 +167,7 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @DatabaseExecutor private void deliverOutstandingMessages(ClientId c) { try { Queue pending = new LinkedList(); @@ -194,6 +195,7 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @DatabaseExecutor private void deliverNextPendingMessage(Queue pending) { try { boolean anyInvalid = false, allDelivered = true; @@ -220,8 +222,8 @@ class ValidationManagerImpl implements ValidationManager, Service, Message m = messageFactory.createMessage(id, raw); Group g = db.getGroup(txn, m.getGroupId()); ClientId c = g.getClientId(); - Metadata meta = db.getMessageMetadataForValidator(txn, - id); + Metadata meta = + db.getMessageMetadataForValidator(txn, id); DeliveryResult result = deliverMessage(txn, m, c, meta); if (result.valid) { pending.addAll(getPendingDependents(txn, id)); @@ -240,8 +242,8 @@ class ValidationManagerImpl implements ValidationManager, Service, db.endTransaction(txn); } if (invalidate != null) invalidateNextMessageAsync(invalidate); - deliverNextPendingMessageAsync(pending); if (toShare != null) shareNextMessageAsync(toShare); + deliverNextPendingMessageAsync(pending); } catch (NoSuchMessageException e) { LOG.info("Message removed before delivery"); deliverNextPendingMessageAsync(pending); @@ -249,13 +251,12 @@ class ValidationManagerImpl implements ValidationManager, Service, LOG.info("Group removed before delivery"); deliverNextPendingMessageAsync(pending); } catch (DbException e) { - if (LOG.isLoggable(WARNING)) - LOG.log(WARNING, e.toString(), e); + if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } } private void validateMessageAsync(final Message m, final Group g) { - cryptoExecutor.execute(new Runnable() { + validationExecutor.execute(new Runnable() { @Override public void run() { validateMessage(m, g); @@ -263,10 +264,12 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @ValidationExecutor private void validateMessage(Message m, Group g) { MessageValidator v = validators.get(g.getClientId()); if (v == null) { - LOG.warning("No validator"); + if (LOG.isLoggable(WARNING)) + LOG.warning("No validator for " + g.getClientId().getString()); } else { try { MessageContext context = v.validateMessage(m, g); @@ -291,6 +294,7 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @DatabaseExecutor private void storeMessageContext(Message m, ClientId c, MessageContext context) { try { @@ -353,6 +357,7 @@ class ValidationManagerImpl implements ValidationManager, Service, } } + @DatabaseExecutor private DeliveryResult deliverMessage(Transaction txn, Message m, ClientId c, Metadata meta) throws DbException { // Deliver the message to the client if it's registered a hook @@ -362,10 +367,7 @@ class ValidationManagerImpl implements ValidationManager, Service, try { shareMsg = hook.incomingMessage(txn, m, meta); } catch (InvalidMessageException e) { - // message is invalid, mark it as such and delete it - db.setMessageState(txn, m.getId(), INVALID); - db.deleteMessageMetadata(txn, m.getId()); - db.deleteMessage(txn, m.getId()); + invalidateMessage(txn, m.getId()); return new DeliveryResult(false, false); } } @@ -373,6 +375,7 @@ class ValidationManagerImpl implements ValidationManager, Service, return new DeliveryResult(true, shareMsg); } + @DatabaseExecutor private Queue getPendingDependents(Transaction txn, MessageId m) throws DbException { Queue pending = new LinkedList(); @@ -392,6 +395,7 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @DatabaseExecutor private void shareOutstandingMessages(ClientId c) { try { Queue toShare = new LinkedList(); @@ -424,6 +428,7 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @DatabaseExecutor private void shareNextMessage(Queue toShare) { try { Transaction txn = db.startTransaction(false); @@ -457,6 +462,7 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @DatabaseExecutor private void invalidateNextMessage(Queue invalidate) { try { Transaction txn = db.startTransaction(false); @@ -479,6 +485,7 @@ class ValidationManagerImpl implements ValidationManager, Service, } } + @DatabaseExecutor private void invalidateMessage(Transaction txn, MessageId m) throws DbException { db.setMessageState(txn, m, INVALID); @@ -486,6 +493,7 @@ class ValidationManagerImpl implements ValidationManager, Service, db.deleteMessageMetadata(txn, m); } + @DatabaseExecutor private Queue getDependentsToInvalidate(Transaction txn, MessageId m) throws DbException { Queue invalidate = new LinkedList(); @@ -515,6 +523,7 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + @DatabaseExecutor private void loadGroupAndValidate(final Message m) { try { Group g; @@ -534,6 +543,7 @@ class ValidationManagerImpl implements ValidationManager, Service, } private static class DeliveryResult { + private final boolean valid, share; private DeliveryResult(boolean valid, boolean share) { diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/ValidationManagerImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/ValidationManagerImplTest.java index b99c2f062..ed6c361de 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/ValidationManagerImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/ValidationManagerImplTest.java @@ -19,12 +19,12 @@ import org.briarproject.bramble.api.sync.ValidationManager.IncomingMessageHook; import org.briarproject.bramble.api.sync.ValidationManager.MessageValidator; import org.briarproject.bramble.api.sync.ValidationManager.State; import org.briarproject.bramble.api.sync.event.MessageAddedEvent; -import org.briarproject.bramble.test.BrambleTestCase; +import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.ImmediateExecutor; import org.briarproject.bramble.test.TestUtils; import org.briarproject.bramble.util.ByteUtils; import org.jmock.Expectations; -import org.jmock.Mockery; +import org.junit.Before; import org.junit.Test; import java.util.Arrays; @@ -38,8 +38,18 @@ import static org.briarproject.bramble.api.sync.ValidationManager.State.INVALID; import static org.briarproject.bramble.api.sync.ValidationManager.State.PENDING; import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN; -public class ValidationManagerImplTest extends BrambleTestCase { +public class ValidationManagerImplTest extends BrambleMockTestCase { + private final DatabaseComponent db = context.mock(DatabaseComponent.class); + private final MessageFactory messageFactory = + context.mock(MessageFactory.class); + private final MessageValidator validator = + context.mock(MessageValidator.class); + private final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + + private final Executor dbExecutor = new ImmediateExecutor(); + private final Executor validationExecutor = new ImmediateExecutor(); private final ClientId clientId = new ClientId(TestUtils.getRandomString(5)); private final MessageId messageId = new MessageId(TestUtils.getRandomId()); @@ -63,23 +73,58 @@ public class ValidationManagerImplTest extends BrambleTestCase { private final MessageContext validResultWithDependencies = new MessageContext(metadata, Collections.singletonList(messageId1)); + private ValidationManagerImpl vm; + public ValidationManagerImplTest() { // Encode the messages System.arraycopy(groupId.getBytes(), 0, raw, 0, UniqueId.LENGTH); ByteUtils.writeUint64(timestamp, raw, UniqueId.LENGTH); } + @Before + public void setUp() { + vm = new ValidationManagerImpl(db, dbExecutor, validationExecutor, + messageFactory); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + } + + @Test + public void testStartAndStop() throws Exception { + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, true); + final Transaction txn2 = new Transaction(null, true); + + context.checking(new Expectations() {{ + // validateOutstandingMessages() + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getMessagesToValidate(txn, clientId); + will(returnValue(Collections.emptyList())); + oneOf(db).commitTransaction(txn); + oneOf(db).endTransaction(txn); + // deliverOutstandingMessages() + oneOf(db).startTransaction(true); + will(returnValue(txn1)); + oneOf(db).getPendingMessages(txn1, clientId); + will(returnValue(Collections.emptyList())); + oneOf(db).commitTransaction(txn1); + oneOf(db).endTransaction(txn1); + // shareOutstandingMessages() + oneOf(db).startTransaction(true); + will(returnValue(txn2)); + oneOf(db).getMessagesToShare(txn2, clientId); + will(returnValue(Collections.emptyList())); + oneOf(db).commitTransaction(txn2); + oneOf(db).endTransaction(txn2); + }}); + + vm.startService(); + vm.stopService(); + } + @Test public void testMessagesAreValidatedAtStartup() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, true); final Transaction txn2 = new Transaction(null, false); @@ -87,6 +132,7 @@ public class ValidationManagerImplTest extends BrambleTestCase { final Transaction txn4 = new Transaction(null, false); final Transaction txn5 = new Transaction(null, true); final Transaction txn6 = new Transaction(null, true); + context.checking(new Expectations() {{ // Get messages to validate oneOf(db).startTransaction(true); @@ -165,26 +211,11 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn6); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.startService(); - - context.assertIsSatisfied(); } @Test public void testPendingMessagesAreDeliveredAtStartup() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, true); final Transaction txn2 = new Transaction(null, false); @@ -266,26 +297,11 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn4); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.startService(); - - context.assertIsSatisfied(); } @Test public void testMessagesAreSharedAtStartup() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, true); final Transaction txn2 = new Transaction(null, true); @@ -333,29 +349,15 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn4); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.startService(); - - context.assertIsSatisfied(); } @Test public void testIncomingMessagesAreShared() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, false); final Transaction txn2 = new Transaction(null, false); + context.checking(new Expectations() {{ // Load the group oneOf(db).startTransaction(true); @@ -396,33 +398,19 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn2); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.eventOccurred(new MessageAddedEvent(message, contactId)); - - context.assertIsSatisfied(); } @Test public void testValidationContinuesAfterNoSuchMessageException() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, true); final Transaction txn2 = new Transaction(null, true); final Transaction txn3 = new Transaction(null, false); final Transaction txn4 = new Transaction(null, true); final Transaction txn5 = new Transaction(null, true); + context.checking(new Expectations() {{ // Get messages to validate oneOf(db).startTransaction(true); @@ -481,33 +469,19 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn5); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.startService(); - - context.assertIsSatisfied(); } @Test public void testValidationContinuesAfterNoSuchGroupException() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, true); final Transaction txn2 = new Transaction(null, true); final Transaction txn3 = new Transaction(null, false); final Transaction txn4 = new Transaction(null, true); final Transaction txn5 = new Transaction(null, true); + context.checking(new Expectations() {{ // Get messages to validate oneOf(db).startTransaction(true); @@ -571,28 +545,14 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn5); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.startService(); - - context.assertIsSatisfied(); } @Test public void testNonLocalMessagesAreValidatedWhenAdded() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, false); + context.checking(new Expectations() {{ // Load the group oneOf(db).startTransaction(true); @@ -619,51 +579,20 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn1); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.eventOccurred(new MessageAddedEvent(message, contactId)); - - context.assertIsSatisfied(); } @Test public void testLocalMessagesAreNotValidatedWhenAdded() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); - - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.eventOccurred(new MessageAddedEvent(message, null)); - - context.assertIsSatisfied(); } @Test public void testMessagesWithUndeliveredDependenciesArePending() throws Exception { - - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, false); + context.checking(new Expectations() {{ // Load the group oneOf(db).startTransaction(true); @@ -688,29 +617,15 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn1); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.eventOccurred(new MessageAddedEvent(message, contactId)); - - context.assertIsSatisfied(); } @Test public void testMessagesWithDeliveredDependenciesGetDelivered() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, false); + context.checking(new Expectations() {{ // Load the group oneOf(db).startTransaction(true); @@ -741,30 +656,16 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn1); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.eventOccurred(new MessageAddedEvent(message, contactId)); - - context.assertIsSatisfied(); } @Test public void testMessagesWithInvalidDependenciesAreInvalid() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, false); final Transaction txn2 = new Transaction(null, false); + context.checking(new Expectations() {{ // Load the group oneOf(db).startTransaction(true); @@ -809,26 +710,11 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn2); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.eventOccurred(new MessageAddedEvent(message, contactId)); - - context.assertIsSatisfied(); } @Test public void testRecursiveInvalidation() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final MessageId messageId3 = new MessageId(TestUtils.getRandomId()); final MessageId messageId4 = new MessageId(TestUtils.getRandomId()); final Map twoDependents = @@ -842,6 +728,7 @@ public class ValidationManagerImplTest extends BrambleTestCase { final Transaction txn4 = new Transaction(null, false); final Transaction txn5 = new Transaction(null, false); final Transaction txn6 = new Transaction(null, false); + context.checking(new Expectations() {{ // Load the group oneOf(db).startTransaction(true); @@ -927,26 +814,11 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn6); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.eventOccurred(new MessageAddedEvent(message, contactId)); - - context.assertIsSatisfied(); } @Test public void testPendingDependentsGetDelivered() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final MessageId messageId3 = new MessageId(TestUtils.getRandomId()); final MessageId messageId4 = new MessageId(TestUtils.getRandomId()); final Message message3 = new Message(messageId3, groupId, timestamp, @@ -968,6 +840,7 @@ public class ValidationManagerImplTest extends BrambleTestCase { final Transaction txn4 = new Transaction(null, false); final Transaction txn5 = new Transaction(null, false); final Transaction txn6 = new Transaction(null, false); + context.checking(new Expectations() {{ // Load the group oneOf(db).startTransaction(true); @@ -1100,26 +973,11 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn6); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.eventOccurred(new MessageAddedEvent(message, contactId)); - - context.assertIsSatisfied(); } @Test public void testOnlyReadyPendingDependentsGetDelivered() throws Exception { - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); final Map twoDependencies = new LinkedHashMap(); twoDependencies.put(messageId, DELIVERED); @@ -1127,6 +985,7 @@ public class ValidationManagerImplTest extends BrambleTestCase { final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, false); final Transaction txn2 = new Transaction(null, false); + context.checking(new Expectations() {{ // Load the group oneOf(db).startTransaction(true); @@ -1162,86 +1021,6 @@ public class ValidationManagerImplTest extends BrambleTestCase { oneOf(db).endTransaction(txn2); }}); - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); vm.eventOccurred(new MessageAddedEvent(message, contactId)); - - context.assertIsSatisfied(); } - - @Test - public void testMessageDependencyCycle() throws Exception { - final MessageContext cycleContext = new MessageContext(metadata, - Collections.singletonList(messageId)); - - Mockery context = new Mockery(); - final DatabaseComponent db = context.mock(DatabaseComponent.class); - final Executor dbExecutor = new ImmediateExecutor(); - final Executor cryptoExecutor = new ImmediateExecutor(); - final MessageFactory messageFactory = - context.mock(MessageFactory.class); - final MessageValidator validator = context.mock(MessageValidator.class); - final IncomingMessageHook hook = - context.mock(IncomingMessageHook.class); - final Transaction txn = new Transaction(null, true); - final Transaction txn1 = new Transaction(null, false); - final Transaction txn2 = new Transaction(null, true); - final Transaction txn3 = new Transaction(null, false); - context.checking(new Expectations() {{ - // Load the group - oneOf(db).startTransaction(true); - will(returnValue(txn)); - oneOf(db).getGroup(txn, groupId); - will(returnValue(group)); - oneOf(db).commitTransaction(txn); - oneOf(db).endTransaction(txn); - // Validate the message: valid - oneOf(validator).validateMessage(message, group); - will(returnValue(validResultWithDependencies)); - // Store the validation result - oneOf(db).startTransaction(false); - will(returnValue(txn1)); - oneOf(db).addMessageDependencies(txn1, message, - validResultWithDependencies.getDependencies()); - oneOf(db).getMessageDependencies(txn1, messageId); - will(returnValue(Collections.singletonMap(messageId1, UNKNOWN))); - oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); - oneOf(db).setMessageState(txn1, messageId, PENDING); - oneOf(db).commitTransaction(txn1); - oneOf(db).endTransaction(txn1); - // Second message is coming in - oneOf(db).startTransaction(true); - will(returnValue(txn2)); - oneOf(db).getGroup(txn2, groupId); - will(returnValue(group)); - oneOf(db).commitTransaction(txn2); - oneOf(db).endTransaction(txn2); - // Validate the message: valid - oneOf(validator).validateMessage(message1, group); - will(returnValue(cycleContext)); - // Store the validation result - oneOf(db).startTransaction(false); - will(returnValue(txn3)); - oneOf(db).addMessageDependencies(txn3, message1, - cycleContext.getDependencies()); - oneOf(db).getMessageDependencies(txn3, messageId1); - will(returnValue(Collections.singletonMap(messageId, PENDING))); - oneOf(db).mergeMessageMetadata(txn3, messageId1, metadata); - oneOf(db).setMessageState(txn3, messageId1, PENDING); - oneOf(db).commitTransaction(txn3); - oneOf(db).endTransaction(txn3); - }}); - - ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, - cryptoExecutor, messageFactory); - vm.registerMessageValidator(clientId, validator); - vm.registerIncomingMessageHook(clientId, hook); - vm.eventOccurred(new MessageAddedEvent(message, contactId)); - vm.eventOccurred(new MessageAddedEvent(message1, contactId)); - - context.assertIsSatisfied(); - } - } From 3aa4644339d11d419c8cad1f7c316e3d3e8fcc06 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 6 Apr 2017 11:36:02 +0100 Subject: [PATCH 4/5] If we have multiple cores, leave one free from crypto tasks. --- .../main/java/org/briarproject/bramble/crypto/CryptoModule.java | 2 +- .../src/main/java/org/briarproject/bramble/sync/SyncModule.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java index bcad98cda..8f0299441 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java @@ -39,7 +39,7 @@ public class CryptoModule { * The maximum number of executor threads. */ private static final int MAX_EXECUTOR_THREADS = - Runtime.getRuntime().availableProcessors(); + Math.max(1, Runtime.getRuntime().availableProcessors() - 1); private final ExecutorService cryptoExecutor; diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java index d4cc88881..eea0e2d3b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java @@ -36,7 +36,7 @@ public class SyncModule { * executor concurrently. */ private static final int MAX_CONCURRENT_VALIDATION_TASKS = - Runtime.getRuntime().availableProcessors(); + Math.max(1, Runtime.getRuntime().availableProcessors() - 1); @Provides GroupFactory provideGroupFactory(CryptoComponent crypto) { From fb85ecf07be22bbf6307593b5fd394c75cd969e2 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 6 Apr 2017 15:34:39 +0100 Subject: [PATCH 5/5] Added note about number of available processors changing. --- .../java/org/briarproject/bramble/crypto/CryptoModule.java | 3 +++ .../main/java/org/briarproject/bramble/sync/SyncModule.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java index 8f0299441..5ab4becce 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java @@ -37,6 +37,9 @@ public class CryptoModule { /** * The maximum number of executor threads. + *

+ * The number of available processors can change during the lifetime of the + * JVM, so this is just a reasonable guess. */ private static final int MAX_EXECUTOR_THREADS = Math.max(1, Runtime.getRuntime().availableProcessors() - 1); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java index eea0e2d3b..ca6ec897a 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java @@ -34,6 +34,9 @@ public class SyncModule { /** * The maximum number of validation tasks to delegate to the crypto * executor concurrently. + *

+ * The number of available processors can change during the lifetime of the + * JVM, so this is just a reasonable guess. */ private static final int MAX_CONCURRENT_VALIDATION_TASKS = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);