mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-18 13:49:53 +01:00
Merge branch '675-polite-executor' into 'master'
Use a polite executor for validation tasks Closes #675 See merge request !507
This commit is contained in:
@@ -0,0 +1,84 @@
|
|||||||
|
package org.briarproject.bramble;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import javax.annotation.concurrent.GuardedBy;
|
||||||
|
|
||||||
|
import static java.util.logging.Level.FINE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link Executor} that delegates its tasks to another {@link Executor}
|
||||||
|
* while limiting the number of tasks that are delegated concurrently. Tasks
|
||||||
|
* are delegated in the order they are submitted to this executor.
|
||||||
|
*/
|
||||||
|
@NotNullByDefault
|
||||||
|
public class PoliteExecutor implements Executor {
|
||||||
|
|
||||||
|
private static final Level LOG_LEVEL = FINE;
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
@GuardedBy("lock")
|
||||||
|
private final Queue<Runnable> queue = new LinkedList<Runnable>();
|
||||||
|
private final Executor delegate;
|
||||||
|
private final int maxConcurrentTasks;
|
||||||
|
private final Logger log;
|
||||||
|
|
||||||
|
@GuardedBy("lock")
|
||||||
|
private int concurrentTasks = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param tag the tag to be used for logging
|
||||||
|
* @param delegate the executor to which tasks will be delegated
|
||||||
|
* @param maxConcurrentTasks the maximum number of tasks that will be
|
||||||
|
* delegated concurrently. If this is set to 1, tasks submitted to this
|
||||||
|
* executor will run in the order they are submitted and will not run
|
||||||
|
* concurrently
|
||||||
|
*/
|
||||||
|
public PoliteExecutor(String tag, Executor delegate,
|
||||||
|
int maxConcurrentTasks) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
this.maxConcurrentTasks = maxConcurrentTasks;
|
||||||
|
log = Logger.getLogger(tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(final Runnable r) {
|
||||||
|
final long submitted = System.currentTimeMillis();
|
||||||
|
Runnable wrapped = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (log.isLoggable(LOG_LEVEL)) {
|
||||||
|
long queued = System.currentTimeMillis() - submitted;
|
||||||
|
log.log(LOG_LEVEL, "Queue time " + queued + " ms");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
r.run();
|
||||||
|
} finally {
|
||||||
|
scheduleNext();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
synchronized (lock) {
|
||||||
|
if (concurrentTasks < maxConcurrentTasks) {
|
||||||
|
concurrentTasks++;
|
||||||
|
delegate.execute(wrapped);
|
||||||
|
} else {
|
||||||
|
queue.add(wrapped);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void scheduleNext() {
|
||||||
|
synchronized (lock) {
|
||||||
|
Runnable next = queue.poll();
|
||||||
|
if (next == null) concurrentTasks--;
|
||||||
|
else delegate.execute(next);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,49 @@
|
|||||||
|
package org.briarproject.bramble;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.RejectedExecutionHandler;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import static java.util.logging.Level.FINE;
|
||||||
|
|
||||||
|
@NotNullByDefault
|
||||||
|
public class TimeLoggingExecutor extends ThreadPoolExecutor {
|
||||||
|
|
||||||
|
private static final Level LOG_LEVEL = FINE;
|
||||||
|
|
||||||
|
private final Logger log;
|
||||||
|
|
||||||
|
public TimeLoggingExecutor(String tag, int corePoolSize, int maxPoolSize,
|
||||||
|
long keepAliveTime, TimeUnit unit,
|
||||||
|
BlockingQueue<Runnable> workQueue,
|
||||||
|
RejectedExecutionHandler handler) {
|
||||||
|
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue,
|
||||||
|
handler);
|
||||||
|
log = Logger.getLogger(tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(final Runnable r) {
|
||||||
|
if (log.isLoggable(LOG_LEVEL)) {
|
||||||
|
final long submitted = System.currentTimeMillis();
|
||||||
|
super.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
long started = System.currentTimeMillis();
|
||||||
|
long queued = started - submitted;
|
||||||
|
log.log(LOG_LEVEL, "Queue time " + queued + " ms");
|
||||||
|
r.run();
|
||||||
|
long executing = System.currentTimeMillis() - started;
|
||||||
|
log.log(LOG_LEVEL, "Execution time " + executing + " ms");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
super.execute(r);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
package org.briarproject.bramble.crypto;
|
package org.briarproject.bramble.crypto;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.TimeLoggingExecutor;
|
||||||
import org.briarproject.bramble.api.crypto.CryptoComponent;
|
import org.briarproject.bramble.api.crypto.CryptoComponent;
|
||||||
import org.briarproject.bramble.api.crypto.CryptoExecutor;
|
import org.briarproject.bramble.api.crypto.CryptoExecutor;
|
||||||
import org.briarproject.bramble.api.crypto.PasswordStrengthEstimator;
|
import org.briarproject.bramble.api.crypto.PasswordStrengthEstimator;
|
||||||
@@ -31,14 +32,17 @@ public class CryptoModule {
|
|||||||
public static class EagerSingletons {
|
public static class EagerSingletons {
|
||||||
@Inject
|
@Inject
|
||||||
@CryptoExecutor
|
@CryptoExecutor
|
||||||
Executor cryptoExecutor;
|
ExecutorService cryptoExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of executor threads.
|
* The maximum number of executor threads.
|
||||||
|
* <p>
|
||||||
|
* The number of available processors can change during the lifetime of the
|
||||||
|
* JVM, so this is just a reasonable guess.
|
||||||
*/
|
*/
|
||||||
private static final int MAX_EXECUTOR_THREADS =
|
private static final int MAX_EXECUTOR_THREADS =
|
||||||
Runtime.getRuntime().availableProcessors();
|
Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
|
||||||
|
|
||||||
private final ExecutorService cryptoExecutor;
|
private final ExecutorService cryptoExecutor;
|
||||||
|
|
||||||
@@ -49,8 +53,8 @@ public class CryptoModule {
|
|||||||
RejectedExecutionHandler policy =
|
RejectedExecutionHandler policy =
|
||||||
new ThreadPoolExecutor.DiscardPolicy();
|
new ThreadPoolExecutor.DiscardPolicy();
|
||||||
// Create a limited # of threads and keep them in the pool for 60 secs
|
// Create a limited # of threads and keep them in the pool for 60 secs
|
||||||
cryptoExecutor = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS,
|
cryptoExecutor = new TimeLoggingExecutor("CryptoExecutor", 0,
|
||||||
60, SECONDS, queue, policy);
|
MAX_EXECUTOR_THREADS, 60, SECONDS, queue, policy);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
@@ -85,11 +89,18 @@ public class CryptoModule {
|
|||||||
@Provides
|
@Provides
|
||||||
@Singleton
|
@Singleton
|
||||||
@CryptoExecutor
|
@CryptoExecutor
|
||||||
Executor getCryptoExecutor(LifecycleManager lifecycleManager) {
|
ExecutorService getCryptoExecutorService(
|
||||||
|
LifecycleManager lifecycleManager) {
|
||||||
lifecycleManager.registerForShutdown(cryptoExecutor);
|
lifecycleManager.registerForShutdown(cryptoExecutor);
|
||||||
return cryptoExecutor;
|
return cryptoExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@CryptoExecutor
|
||||||
|
Executor getCryptoExecutor() {
|
||||||
|
return cryptoExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
SecureRandom getSecureRandom(CryptoComponent crypto) {
|
SecureRandom getSecureRandom(CryptoComponent crypto) {
|
||||||
return crypto.getSecureRandom();
|
return crypto.getSecureRandom();
|
||||||
|
|||||||
@@ -67,6 +67,7 @@ import javax.annotation.Nullable;
|
|||||||
import javax.annotation.concurrent.ThreadSafe;
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
|
||||||
|
import static java.util.logging.Level.FINE;
|
||||||
import static java.util.logging.Level.WARNING;
|
import static java.util.logging.Level.WARNING;
|
||||||
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
|
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
|
||||||
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
|
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
|
||||||
@@ -130,8 +131,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
|||||||
// Don't allow reentrant locking
|
// Don't allow reentrant locking
|
||||||
if (lock.getReadHoldCount() > 0) throw new IllegalStateException();
|
if (lock.getReadHoldCount() > 0) throw new IllegalStateException();
|
||||||
if (lock.getWriteHoldCount() > 0) throw new IllegalStateException();
|
if (lock.getWriteHoldCount() > 0) throw new IllegalStateException();
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
if (readOnly) lock.readLock().lock();
|
if (readOnly) lock.readLock().lock();
|
||||||
else lock.writeLock().lock();
|
else lock.writeLock().lock();
|
||||||
|
if (LOG.isLoggable(FINE)) {
|
||||||
|
long duration = System.currentTimeMillis() - start;
|
||||||
|
if (readOnly) LOG.fine("Waited " + duration + " ms for read lock");
|
||||||
|
else LOG.fine("Waited " + duration + " ms for write lock");
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
return new Transaction(db.startTransaction(), readOnly);
|
return new Transaction(db.startTransaction(), readOnly);
|
||||||
} catch (DbException e) {
|
} catch (DbException e) {
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package org.briarproject.bramble.db;
|
package org.briarproject.bramble.db;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.TimeLoggingExecutor;
|
||||||
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
||||||
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
|
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
|
||||||
|
|
||||||
@@ -36,8 +37,8 @@ public class DatabaseExecutorModule {
|
|||||||
RejectedExecutionHandler policy =
|
RejectedExecutionHandler policy =
|
||||||
new ThreadPoolExecutor.DiscardPolicy();
|
new ThreadPoolExecutor.DiscardPolicy();
|
||||||
// Use a single thread and keep it in the pool for 60 secs
|
// Use a single thread and keep it in the pool for 60 secs
|
||||||
databaseExecutor = new ThreadPoolExecutor(0, 1, 60, SECONDS, queue,
|
databaseExecutor = new TimeLoggingExecutor("DatabaseExecutor", 0, 1,
|
||||||
policy);
|
60, SECONDS, queue, policy);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package org.briarproject.bramble.sync;
|
package org.briarproject.bramble.sync;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.PoliteExecutor;
|
||||||
import org.briarproject.bramble.api.crypto.CryptoComponent;
|
import org.briarproject.bramble.api.crypto.CryptoComponent;
|
||||||
|
import org.briarproject.bramble.api.crypto.CryptoExecutor;
|
||||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||||
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
||||||
import org.briarproject.bramble.api.event.EventBus;
|
import org.briarproject.bramble.api.event.EventBus;
|
||||||
@@ -29,6 +31,16 @@ public class SyncModule {
|
|||||||
ValidationManager validationManager;
|
ValidationManager validationManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of validation tasks to delegate to the crypto
|
||||||
|
* executor concurrently.
|
||||||
|
* <p>
|
||||||
|
* The number of available processors can change during the lifetime of the
|
||||||
|
* JVM, so this is just a reasonable guess.
|
||||||
|
*/
|
||||||
|
private static final int MAX_CONCURRENT_VALIDATION_TASKS =
|
||||||
|
Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
GroupFactory provideGroupFactory(CryptoComponent crypto) {
|
GroupFactory provideGroupFactory(CryptoComponent crypto) {
|
||||||
return new GroupFactoryImpl(crypto);
|
return new GroupFactoryImpl(crypto);
|
||||||
@@ -62,10 +74,20 @@ public class SyncModule {
|
|||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
@Singleton
|
@Singleton
|
||||||
ValidationManager getValidationManager(LifecycleManager lifecycleManager,
|
ValidationManager provideValidationManager(
|
||||||
EventBus eventBus, ValidationManagerImpl validationManager) {
|
LifecycleManager lifecycleManager, EventBus eventBus,
|
||||||
|
ValidationManagerImpl validationManager) {
|
||||||
lifecycleManager.registerService(validationManager);
|
lifecycleManager.registerService(validationManager);
|
||||||
eventBus.addListener(validationManager);
|
eventBus.addListener(validationManager);
|
||||||
return validationManager;
|
return validationManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@Singleton
|
||||||
|
@ValidationExecutor
|
||||||
|
Executor provideValidationExecutor(
|
||||||
|
@CryptoExecutor Executor cryptoExecutor) {
|
||||||
|
return new PoliteExecutor("ValidationExecutor", cryptoExecutor,
|
||||||
|
MAX_CONCURRENT_VALIDATION_TASKS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,25 @@
|
|||||||
|
package org.briarproject.bramble.sync;
|
||||||
|
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.Target;
|
||||||
|
|
||||||
|
import javax.inject.Qualifier;
|
||||||
|
|
||||||
|
import static java.lang.annotation.ElementType.FIELD;
|
||||||
|
import static java.lang.annotation.ElementType.METHOD;
|
||||||
|
import static java.lang.annotation.ElementType.PARAMETER;
|
||||||
|
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Annotation for injecting the executor for validation tasks. Also used for
|
||||||
|
* annotating methods that should run on the validation executor.
|
||||||
|
* <p>
|
||||||
|
* The contract of this executor is that tasks may be run concurrently, and
|
||||||
|
* submitting a task will never block. Tasks must not run indefinitely. Tasks
|
||||||
|
* submitted during shutdown are discarded.
|
||||||
|
*/
|
||||||
|
@Qualifier
|
||||||
|
@Target({FIELD, METHOD, PARAMETER})
|
||||||
|
@Retention(RUNTIME)
|
||||||
|
@interface ValidationExecutor {
|
||||||
|
}
|
||||||
@@ -1,6 +1,5 @@
|
|||||||
package org.briarproject.bramble.sync;
|
package org.briarproject.bramble.sync;
|
||||||
|
|
||||||
import org.briarproject.bramble.api.crypto.CryptoExecutor;
|
|
||||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||||
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
||||||
import org.briarproject.bramble.api.db.DbException;
|
import org.briarproject.bramble.api.db.DbException;
|
||||||
@@ -50,8 +49,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
Logger.getLogger(ValidationManagerImpl.class.getName());
|
Logger.getLogger(ValidationManagerImpl.class.getName());
|
||||||
|
|
||||||
private final DatabaseComponent db;
|
private final DatabaseComponent db;
|
||||||
private final Executor dbExecutor;
|
private final Executor dbExecutor, validationExecutor;
|
||||||
private final Executor cryptoExecutor;
|
|
||||||
private final MessageFactory messageFactory;
|
private final MessageFactory messageFactory;
|
||||||
private final Map<ClientId, MessageValidator> validators;
|
private final Map<ClientId, MessageValidator> validators;
|
||||||
private final Map<ClientId, IncomingMessageHook> hooks;
|
private final Map<ClientId, IncomingMessageHook> hooks;
|
||||||
@@ -60,11 +58,11 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
@Inject
|
@Inject
|
||||||
ValidationManagerImpl(DatabaseComponent db,
|
ValidationManagerImpl(DatabaseComponent db,
|
||||||
@DatabaseExecutor Executor dbExecutor,
|
@DatabaseExecutor Executor dbExecutor,
|
||||||
@CryptoExecutor Executor cryptoExecutor,
|
@ValidationExecutor Executor validationExecutor,
|
||||||
MessageFactory messageFactory) {
|
MessageFactory messageFactory) {
|
||||||
this.db = db;
|
this.db = db;
|
||||||
this.dbExecutor = dbExecutor;
|
this.dbExecutor = dbExecutor;
|
||||||
this.cryptoExecutor = cryptoExecutor;
|
this.validationExecutor = validationExecutor;
|
||||||
this.messageFactory = messageFactory;
|
this.messageFactory = messageFactory;
|
||||||
validators = new ConcurrentHashMap<ClientId, MessageValidator>();
|
validators = new ConcurrentHashMap<ClientId, MessageValidator>();
|
||||||
hooks = new ConcurrentHashMap<ClientId, IncomingMessageHook>();
|
hooks = new ConcurrentHashMap<ClientId, IncomingMessageHook>();
|
||||||
@@ -104,6 +102,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void validateOutstandingMessages(ClientId c) {
|
private void validateOutstandingMessages(ClientId c) {
|
||||||
try {
|
try {
|
||||||
Queue<MessageId> unvalidated = new LinkedList<MessageId>();
|
Queue<MessageId> unvalidated = new LinkedList<MessageId>();
|
||||||
@@ -130,6 +129,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void validateNextMessage(Queue<MessageId> unvalidated) {
|
private void validateNextMessage(Queue<MessageId> unvalidated) {
|
||||||
try {
|
try {
|
||||||
Message m;
|
Message m;
|
||||||
@@ -167,6 +167,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void deliverOutstandingMessages(ClientId c) {
|
private void deliverOutstandingMessages(ClientId c) {
|
||||||
try {
|
try {
|
||||||
Queue<MessageId> pending = new LinkedList<MessageId>();
|
Queue<MessageId> pending = new LinkedList<MessageId>();
|
||||||
@@ -194,6 +195,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void deliverNextPendingMessage(Queue<MessageId> pending) {
|
private void deliverNextPendingMessage(Queue<MessageId> pending) {
|
||||||
try {
|
try {
|
||||||
boolean anyInvalid = false, allDelivered = true;
|
boolean anyInvalid = false, allDelivered = true;
|
||||||
@@ -220,8 +222,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
Message m = messageFactory.createMessage(id, raw);
|
Message m = messageFactory.createMessage(id, raw);
|
||||||
Group g = db.getGroup(txn, m.getGroupId());
|
Group g = db.getGroup(txn, m.getGroupId());
|
||||||
ClientId c = g.getClientId();
|
ClientId c = g.getClientId();
|
||||||
Metadata meta = db.getMessageMetadataForValidator(txn,
|
Metadata meta =
|
||||||
id);
|
db.getMessageMetadataForValidator(txn, id);
|
||||||
DeliveryResult result = deliverMessage(txn, m, c, meta);
|
DeliveryResult result = deliverMessage(txn, m, c, meta);
|
||||||
if (result.valid) {
|
if (result.valid) {
|
||||||
pending.addAll(getPendingDependents(txn, id));
|
pending.addAll(getPendingDependents(txn, id));
|
||||||
@@ -240,8 +242,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
db.endTransaction(txn);
|
db.endTransaction(txn);
|
||||||
}
|
}
|
||||||
if (invalidate != null) invalidateNextMessageAsync(invalidate);
|
if (invalidate != null) invalidateNextMessageAsync(invalidate);
|
||||||
deliverNextPendingMessageAsync(pending);
|
|
||||||
if (toShare != null) shareNextMessageAsync(toShare);
|
if (toShare != null) shareNextMessageAsync(toShare);
|
||||||
|
deliverNextPendingMessageAsync(pending);
|
||||||
} catch (NoSuchMessageException e) {
|
} catch (NoSuchMessageException e) {
|
||||||
LOG.info("Message removed before delivery");
|
LOG.info("Message removed before delivery");
|
||||||
deliverNextPendingMessageAsync(pending);
|
deliverNextPendingMessageAsync(pending);
|
||||||
@@ -249,13 +251,12 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
LOG.info("Group removed before delivery");
|
LOG.info("Group removed before delivery");
|
||||||
deliverNextPendingMessageAsync(pending);
|
deliverNextPendingMessageAsync(pending);
|
||||||
} catch (DbException e) {
|
} catch (DbException e) {
|
||||||
if (LOG.isLoggable(WARNING))
|
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||||
LOG.log(WARNING, e.toString(), e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateMessageAsync(final Message m, final Group g) {
|
private void validateMessageAsync(final Message m, final Group g) {
|
||||||
cryptoExecutor.execute(new Runnable() {
|
validationExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
validateMessage(m, g);
|
validateMessage(m, g);
|
||||||
@@ -263,10 +264,12 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ValidationExecutor
|
||||||
private void validateMessage(Message m, Group g) {
|
private void validateMessage(Message m, Group g) {
|
||||||
MessageValidator v = validators.get(g.getClientId());
|
MessageValidator v = validators.get(g.getClientId());
|
||||||
if (v == null) {
|
if (v == null) {
|
||||||
LOG.warning("No validator");
|
if (LOG.isLoggable(WARNING))
|
||||||
|
LOG.warning("No validator for " + g.getClientId().getString());
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
MessageContext context = v.validateMessage(m, g);
|
MessageContext context = v.validateMessage(m, g);
|
||||||
@@ -291,6 +294,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void storeMessageContext(Message m, ClientId c,
|
private void storeMessageContext(Message m, ClientId c,
|
||||||
MessageContext context) {
|
MessageContext context) {
|
||||||
try {
|
try {
|
||||||
@@ -353,6 +357,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private DeliveryResult deliverMessage(Transaction txn, Message m,
|
private DeliveryResult deliverMessage(Transaction txn, Message m,
|
||||||
ClientId c, Metadata meta) throws DbException {
|
ClientId c, Metadata meta) throws DbException {
|
||||||
// Deliver the message to the client if it's registered a hook
|
// Deliver the message to the client if it's registered a hook
|
||||||
@@ -362,10 +367,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
try {
|
try {
|
||||||
shareMsg = hook.incomingMessage(txn, m, meta);
|
shareMsg = hook.incomingMessage(txn, m, meta);
|
||||||
} catch (InvalidMessageException e) {
|
} catch (InvalidMessageException e) {
|
||||||
// message is invalid, mark it as such and delete it
|
invalidateMessage(txn, m.getId());
|
||||||
db.setMessageState(txn, m.getId(), INVALID);
|
|
||||||
db.deleteMessageMetadata(txn, m.getId());
|
|
||||||
db.deleteMessage(txn, m.getId());
|
|
||||||
return new DeliveryResult(false, false);
|
return new DeliveryResult(false, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -373,6 +375,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
return new DeliveryResult(true, shareMsg);
|
return new DeliveryResult(true, shareMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private Queue<MessageId> getPendingDependents(Transaction txn, MessageId m)
|
private Queue<MessageId> getPendingDependents(Transaction txn, MessageId m)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
Queue<MessageId> pending = new LinkedList<MessageId>();
|
Queue<MessageId> pending = new LinkedList<MessageId>();
|
||||||
@@ -392,6 +395,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void shareOutstandingMessages(ClientId c) {
|
private void shareOutstandingMessages(ClientId c) {
|
||||||
try {
|
try {
|
||||||
Queue<MessageId> toShare = new LinkedList<MessageId>();
|
Queue<MessageId> toShare = new LinkedList<MessageId>();
|
||||||
@@ -424,6 +428,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void shareNextMessage(Queue<MessageId> toShare) {
|
private void shareNextMessage(Queue<MessageId> toShare) {
|
||||||
try {
|
try {
|
||||||
Transaction txn = db.startTransaction(false);
|
Transaction txn = db.startTransaction(false);
|
||||||
@@ -457,6 +462,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void invalidateNextMessage(Queue<MessageId> invalidate) {
|
private void invalidateNextMessage(Queue<MessageId> invalidate) {
|
||||||
try {
|
try {
|
||||||
Transaction txn = db.startTransaction(false);
|
Transaction txn = db.startTransaction(false);
|
||||||
@@ -479,6 +485,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void invalidateMessage(Transaction txn, MessageId m)
|
private void invalidateMessage(Transaction txn, MessageId m)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
db.setMessageState(txn, m, INVALID);
|
db.setMessageState(txn, m, INVALID);
|
||||||
@@ -486,6 +493,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
db.deleteMessageMetadata(txn, m);
|
db.deleteMessageMetadata(txn, m);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private Queue<MessageId> getDependentsToInvalidate(Transaction txn,
|
private Queue<MessageId> getDependentsToInvalidate(Transaction txn,
|
||||||
MessageId m) throws DbException {
|
MessageId m) throws DbException {
|
||||||
Queue<MessageId> invalidate = new LinkedList<MessageId>();
|
Queue<MessageId> invalidate = new LinkedList<MessageId>();
|
||||||
@@ -515,6 +523,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DatabaseExecutor
|
||||||
private void loadGroupAndValidate(final Message m) {
|
private void loadGroupAndValidate(final Message m) {
|
||||||
try {
|
try {
|
||||||
Group g;
|
Group g;
|
||||||
@@ -534,6 +543,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static class DeliveryResult {
|
private static class DeliveryResult {
|
||||||
|
|
||||||
private final boolean valid, share;
|
private final boolean valid, share;
|
||||||
|
|
||||||
private DeliveryResult(boolean valid, boolean share) {
|
private DeliveryResult(boolean valid, boolean share) {
|
||||||
|
|||||||
@@ -0,0 +1,142 @@
|
|||||||
|
package org.briarproject.bramble;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.test.BrambleTestCase;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Vector;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
public class PoliteExecutorTest extends BrambleTestCase {
|
||||||
|
|
||||||
|
private static final String TAG = "Test";
|
||||||
|
private static final int TASKS = 10;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTasksAreDelegatedInOrderOfSubmission() throws Exception {
|
||||||
|
// Delegate to a single-threaded executor
|
||||||
|
Executor delegate = Executors.newSingleThreadExecutor();
|
||||||
|
// Allow all the tasks to be delegated straight away
|
||||||
|
PoliteExecutor polite = new PoliteExecutor(TAG, delegate, TASKS * 2);
|
||||||
|
final List<Integer> list = new Vector<Integer>();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(TASKS);
|
||||||
|
for (int i = 0; i < TASKS; i++) {
|
||||||
|
final int result = i;
|
||||||
|
polite.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
list.add(result);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// Wait for all the tasks to finish
|
||||||
|
latch.await();
|
||||||
|
// The tasks should have run in the order they were submitted
|
||||||
|
assertEquals(ascendingOrder(), list);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueuedTasksAreDelegatedInOrderOfSubmission()
|
||||||
|
throws Exception {
|
||||||
|
// Delegate to a single-threaded executor
|
||||||
|
Executor delegate = Executors.newSingleThreadExecutor();
|
||||||
|
// Allow two tasks to be delegated at a time
|
||||||
|
PoliteExecutor polite = new PoliteExecutor(TAG, delegate, 2);
|
||||||
|
final List<Integer> list = new Vector<Integer>();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(TASKS);
|
||||||
|
for (int i = 0; i < TASKS; i++) {
|
||||||
|
final int result = i;
|
||||||
|
polite.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
list.add(result);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// Wait for all the tasks to finish
|
||||||
|
latch.await();
|
||||||
|
// The tasks should have run in the order they were submitted
|
||||||
|
assertEquals(ascendingOrder(), list);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTasksRunInParallelOnDelegate() throws Exception {
|
||||||
|
// Delegate to a multi-threaded executor
|
||||||
|
Executor delegate = Executors.newCachedThreadPool();
|
||||||
|
// Allow all the tasks to be delegated straight away
|
||||||
|
PoliteExecutor polite = new PoliteExecutor(TAG, delegate, TASKS * 2);
|
||||||
|
final List<Integer> list = new Vector<Integer>();
|
||||||
|
final CountDownLatch[] latches = new CountDownLatch[TASKS];
|
||||||
|
for (int i = 0; i < TASKS; i++) latches[i] = new CountDownLatch(1);
|
||||||
|
for (int i = 0; i < TASKS; i++) {
|
||||||
|
final int result = i;
|
||||||
|
polite.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
// Each task waits for the next task, if any, to finish
|
||||||
|
if (result < TASKS - 1) latches[result + 1].await();
|
||||||
|
list.add(result);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
latches[result].countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// Wait for all the tasks to finish
|
||||||
|
for (int i = 0; i < TASKS; i++) latches[i].await();
|
||||||
|
// The tasks should have finished in reverse order
|
||||||
|
assertEquals(descendingOrder(), list);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTasksDoNotRunInParallelOnDelegate() throws Exception {
|
||||||
|
// Delegate to a multi-threaded executor
|
||||||
|
Executor delegate = Executors.newCachedThreadPool();
|
||||||
|
// Allow one task to be delegated at a time
|
||||||
|
PoliteExecutor polite = new PoliteExecutor(TAG, delegate, 1);
|
||||||
|
final List<Integer> list = new Vector<Integer>();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(TASKS);
|
||||||
|
for (int i = 0; i < TASKS; i++) {
|
||||||
|
final int result = i;
|
||||||
|
polite.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
// Each task runs faster than the previous task
|
||||||
|
Thread.sleep(TASKS - result);
|
||||||
|
list.add(result);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// Wait for all the tasks to finish
|
||||||
|
latch.await();
|
||||||
|
// The tasks should have finished in the order they were submitted
|
||||||
|
assertEquals(ascendingOrder(), list);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Integer> ascendingOrder() {
|
||||||
|
Integer[] array = new Integer[TASKS];
|
||||||
|
for (int i = 0; i < TASKS; i++) array[i] = i;
|
||||||
|
return Arrays.asList(array);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Integer> descendingOrder() {
|
||||||
|
Integer[] array = new Integer[TASKS];
|
||||||
|
for (int i = 0; i < TASKS; i++) array[i] = TASKS - 1 - i;
|
||||||
|
return Arrays.asList(array);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -19,12 +19,12 @@ import org.briarproject.bramble.api.sync.ValidationManager.IncomingMessageHook;
|
|||||||
import org.briarproject.bramble.api.sync.ValidationManager.MessageValidator;
|
import org.briarproject.bramble.api.sync.ValidationManager.MessageValidator;
|
||||||
import org.briarproject.bramble.api.sync.ValidationManager.State;
|
import org.briarproject.bramble.api.sync.ValidationManager.State;
|
||||||
import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
|
import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
|
||||||
import org.briarproject.bramble.test.BrambleTestCase;
|
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||||
import org.briarproject.bramble.test.ImmediateExecutor;
|
import org.briarproject.bramble.test.ImmediateExecutor;
|
||||||
import org.briarproject.bramble.test.TestUtils;
|
import org.briarproject.bramble.test.TestUtils;
|
||||||
import org.briarproject.bramble.util.ByteUtils;
|
import org.briarproject.bramble.util.ByteUtils;
|
||||||
import org.jmock.Expectations;
|
import org.jmock.Expectations;
|
||||||
import org.jmock.Mockery;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -38,8 +38,18 @@ import static org.briarproject.bramble.api.sync.ValidationManager.State.INVALID;
|
|||||||
import static org.briarproject.bramble.api.sync.ValidationManager.State.PENDING;
|
import static org.briarproject.bramble.api.sync.ValidationManager.State.PENDING;
|
||||||
import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN;
|
import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN;
|
||||||
|
|
||||||
public class ValidationManagerImplTest extends BrambleTestCase {
|
public class ValidationManagerImplTest extends BrambleMockTestCase {
|
||||||
|
|
||||||
|
private final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||||
|
private final MessageFactory messageFactory =
|
||||||
|
context.mock(MessageFactory.class);
|
||||||
|
private final MessageValidator validator =
|
||||||
|
context.mock(MessageValidator.class);
|
||||||
|
private final IncomingMessageHook hook =
|
||||||
|
context.mock(IncomingMessageHook.class);
|
||||||
|
|
||||||
|
private final Executor dbExecutor = new ImmediateExecutor();
|
||||||
|
private final Executor validationExecutor = new ImmediateExecutor();
|
||||||
private final ClientId clientId =
|
private final ClientId clientId =
|
||||||
new ClientId(TestUtils.getRandomString(5));
|
new ClientId(TestUtils.getRandomString(5));
|
||||||
private final MessageId messageId = new MessageId(TestUtils.getRandomId());
|
private final MessageId messageId = new MessageId(TestUtils.getRandomId());
|
||||||
@@ -63,23 +73,58 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
private final MessageContext validResultWithDependencies =
|
private final MessageContext validResultWithDependencies =
|
||||||
new MessageContext(metadata, Collections.singletonList(messageId1));
|
new MessageContext(metadata, Collections.singletonList(messageId1));
|
||||||
|
|
||||||
|
private ValidationManagerImpl vm;
|
||||||
|
|
||||||
public ValidationManagerImplTest() {
|
public ValidationManagerImplTest() {
|
||||||
// Encode the messages
|
// Encode the messages
|
||||||
System.arraycopy(groupId.getBytes(), 0, raw, 0, UniqueId.LENGTH);
|
System.arraycopy(groupId.getBytes(), 0, raw, 0, UniqueId.LENGTH);
|
||||||
ByteUtils.writeUint64(timestamp, raw, UniqueId.LENGTH);
|
ByteUtils.writeUint64(timestamp, raw, UniqueId.LENGTH);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
vm = new ValidationManagerImpl(db, dbExecutor, validationExecutor,
|
||||||
|
messageFactory);
|
||||||
|
vm.registerMessageValidator(clientId, validator);
|
||||||
|
vm.registerIncomingMessageHook(clientId, hook);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStartAndStop() throws Exception {
|
||||||
|
final Transaction txn = new Transaction(null, true);
|
||||||
|
final Transaction txn1 = new Transaction(null, true);
|
||||||
|
final Transaction txn2 = new Transaction(null, true);
|
||||||
|
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
// validateOutstandingMessages()
|
||||||
|
oneOf(db).startTransaction(true);
|
||||||
|
will(returnValue(txn));
|
||||||
|
oneOf(db).getMessagesToValidate(txn, clientId);
|
||||||
|
will(returnValue(Collections.emptyList()));
|
||||||
|
oneOf(db).commitTransaction(txn);
|
||||||
|
oneOf(db).endTransaction(txn);
|
||||||
|
// deliverOutstandingMessages()
|
||||||
|
oneOf(db).startTransaction(true);
|
||||||
|
will(returnValue(txn1));
|
||||||
|
oneOf(db).getPendingMessages(txn1, clientId);
|
||||||
|
will(returnValue(Collections.emptyList()));
|
||||||
|
oneOf(db).commitTransaction(txn1);
|
||||||
|
oneOf(db).endTransaction(txn1);
|
||||||
|
// shareOutstandingMessages()
|
||||||
|
oneOf(db).startTransaction(true);
|
||||||
|
will(returnValue(txn2));
|
||||||
|
oneOf(db).getMessagesToShare(txn2, clientId);
|
||||||
|
will(returnValue(Collections.emptyList()));
|
||||||
|
oneOf(db).commitTransaction(txn2);
|
||||||
|
oneOf(db).endTransaction(txn2);
|
||||||
|
}});
|
||||||
|
|
||||||
|
vm.startService();
|
||||||
|
vm.stopService();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMessagesAreValidatedAtStartup() throws Exception {
|
public void testMessagesAreValidatedAtStartup() throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, true);
|
final Transaction txn1 = new Transaction(null, true);
|
||||||
final Transaction txn2 = new Transaction(null, false);
|
final Transaction txn2 = new Transaction(null, false);
|
||||||
@@ -87,6 +132,7 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
final Transaction txn4 = new Transaction(null, false);
|
final Transaction txn4 = new Transaction(null, false);
|
||||||
final Transaction txn5 = new Transaction(null, true);
|
final Transaction txn5 = new Transaction(null, true);
|
||||||
final Transaction txn6 = new Transaction(null, true);
|
final Transaction txn6 = new Transaction(null, true);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Get messages to validate
|
// Get messages to validate
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -165,26 +211,11 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn6);
|
oneOf(db).endTransaction(txn6);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.startService();
|
vm.startService();
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPendingMessagesAreDeliveredAtStartup() throws Exception {
|
public void testPendingMessagesAreDeliveredAtStartup() throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, true);
|
final Transaction txn1 = new Transaction(null, true);
|
||||||
final Transaction txn2 = new Transaction(null, false);
|
final Transaction txn2 = new Transaction(null, false);
|
||||||
@@ -266,26 +297,11 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn4);
|
oneOf(db).endTransaction(txn4);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.startService();
|
vm.startService();
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMessagesAreSharedAtStartup() throws Exception {
|
public void testMessagesAreSharedAtStartup() throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, true);
|
final Transaction txn1 = new Transaction(null, true);
|
||||||
final Transaction txn2 = new Transaction(null, true);
|
final Transaction txn2 = new Transaction(null, true);
|
||||||
@@ -333,29 +349,15 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn4);
|
oneOf(db).endTransaction(txn4);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.startService();
|
vm.startService();
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIncomingMessagesAreShared() throws Exception {
|
public void testIncomingMessagesAreShared() throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, false);
|
final Transaction txn1 = new Transaction(null, false);
|
||||||
final Transaction txn2 = new Transaction(null, false);
|
final Transaction txn2 = new Transaction(null, false);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Load the group
|
// Load the group
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -396,33 +398,19 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn2);
|
oneOf(db).endTransaction(txn2);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testValidationContinuesAfterNoSuchMessageException()
|
public void testValidationContinuesAfterNoSuchMessageException()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, true);
|
final Transaction txn1 = new Transaction(null, true);
|
||||||
final Transaction txn2 = new Transaction(null, true);
|
final Transaction txn2 = new Transaction(null, true);
|
||||||
final Transaction txn3 = new Transaction(null, false);
|
final Transaction txn3 = new Transaction(null, false);
|
||||||
final Transaction txn4 = new Transaction(null, true);
|
final Transaction txn4 = new Transaction(null, true);
|
||||||
final Transaction txn5 = new Transaction(null, true);
|
final Transaction txn5 = new Transaction(null, true);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Get messages to validate
|
// Get messages to validate
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -481,33 +469,19 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn5);
|
oneOf(db).endTransaction(txn5);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.startService();
|
vm.startService();
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testValidationContinuesAfterNoSuchGroupException()
|
public void testValidationContinuesAfterNoSuchGroupException()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, true);
|
final Transaction txn1 = new Transaction(null, true);
|
||||||
final Transaction txn2 = new Transaction(null, true);
|
final Transaction txn2 = new Transaction(null, true);
|
||||||
final Transaction txn3 = new Transaction(null, false);
|
final Transaction txn3 = new Transaction(null, false);
|
||||||
final Transaction txn4 = new Transaction(null, true);
|
final Transaction txn4 = new Transaction(null, true);
|
||||||
final Transaction txn5 = new Transaction(null, true);
|
final Transaction txn5 = new Transaction(null, true);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Get messages to validate
|
// Get messages to validate
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -571,28 +545,14 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn5);
|
oneOf(db).endTransaction(txn5);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.startService();
|
vm.startService();
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNonLocalMessagesAreValidatedWhenAdded() throws Exception {
|
public void testNonLocalMessagesAreValidatedWhenAdded() throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, false);
|
final Transaction txn1 = new Transaction(null, false);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Load the group
|
// Load the group
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -619,51 +579,20 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn1);
|
oneOf(db).endTransaction(txn1);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLocalMessagesAreNotValidatedWhenAdded() throws Exception {
|
public void testLocalMessagesAreNotValidatedWhenAdded() throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, null));
|
vm.eventOccurred(new MessageAddedEvent(message, null));
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMessagesWithUndeliveredDependenciesArePending()
|
public void testMessagesWithUndeliveredDependenciesArePending()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, false);
|
final Transaction txn1 = new Transaction(null, false);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Load the group
|
// Load the group
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -688,29 +617,15 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn1);
|
oneOf(db).endTransaction(txn1);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMessagesWithDeliveredDependenciesGetDelivered()
|
public void testMessagesWithDeliveredDependenciesGetDelivered()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, false);
|
final Transaction txn1 = new Transaction(null, false);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Load the group
|
// Load the group
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -741,30 +656,16 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn1);
|
oneOf(db).endTransaction(txn1);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMessagesWithInvalidDependenciesAreInvalid()
|
public void testMessagesWithInvalidDependenciesAreInvalid()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, false);
|
final Transaction txn1 = new Transaction(null, false);
|
||||||
final Transaction txn2 = new Transaction(null, false);
|
final Transaction txn2 = new Transaction(null, false);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Load the group
|
// Load the group
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -809,26 +710,11 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn2);
|
oneOf(db).endTransaction(txn2);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRecursiveInvalidation() throws Exception {
|
public void testRecursiveInvalidation() throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final MessageId messageId3 = new MessageId(TestUtils.getRandomId());
|
final MessageId messageId3 = new MessageId(TestUtils.getRandomId());
|
||||||
final MessageId messageId4 = new MessageId(TestUtils.getRandomId());
|
final MessageId messageId4 = new MessageId(TestUtils.getRandomId());
|
||||||
final Map<MessageId, State> twoDependents =
|
final Map<MessageId, State> twoDependents =
|
||||||
@@ -842,6 +728,7 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
final Transaction txn4 = new Transaction(null, false);
|
final Transaction txn4 = new Transaction(null, false);
|
||||||
final Transaction txn5 = new Transaction(null, false);
|
final Transaction txn5 = new Transaction(null, false);
|
||||||
final Transaction txn6 = new Transaction(null, false);
|
final Transaction txn6 = new Transaction(null, false);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Load the group
|
// Load the group
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -927,26 +814,11 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn6);
|
oneOf(db).endTransaction(txn6);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPendingDependentsGetDelivered() throws Exception {
|
public void testPendingDependentsGetDelivered() throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final MessageId messageId3 = new MessageId(TestUtils.getRandomId());
|
final MessageId messageId3 = new MessageId(TestUtils.getRandomId());
|
||||||
final MessageId messageId4 = new MessageId(TestUtils.getRandomId());
|
final MessageId messageId4 = new MessageId(TestUtils.getRandomId());
|
||||||
final Message message3 = new Message(messageId3, groupId, timestamp,
|
final Message message3 = new Message(messageId3, groupId, timestamp,
|
||||||
@@ -968,6 +840,7 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
final Transaction txn4 = new Transaction(null, false);
|
final Transaction txn4 = new Transaction(null, false);
|
||||||
final Transaction txn5 = new Transaction(null, false);
|
final Transaction txn5 = new Transaction(null, false);
|
||||||
final Transaction txn6 = new Transaction(null, false);
|
final Transaction txn6 = new Transaction(null, false);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Load the group
|
// Load the group
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -1100,26 +973,11 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn6);
|
oneOf(db).endTransaction(txn6);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOnlyReadyPendingDependentsGetDelivered() throws Exception {
|
public void testOnlyReadyPendingDependentsGetDelivered() throws Exception {
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Map<MessageId, State> twoDependencies =
|
final Map<MessageId, State> twoDependencies =
|
||||||
new LinkedHashMap<MessageId, State>();
|
new LinkedHashMap<MessageId, State>();
|
||||||
twoDependencies.put(messageId, DELIVERED);
|
twoDependencies.put(messageId, DELIVERED);
|
||||||
@@ -1127,6 +985,7 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
final Transaction txn = new Transaction(null, true);
|
final Transaction txn = new Transaction(null, true);
|
||||||
final Transaction txn1 = new Transaction(null, false);
|
final Transaction txn1 = new Transaction(null, false);
|
||||||
final Transaction txn2 = new Transaction(null, false);
|
final Transaction txn2 = new Transaction(null, false);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
// Load the group
|
// Load the group
|
||||||
oneOf(db).startTransaction(true);
|
oneOf(db).startTransaction(true);
|
||||||
@@ -1162,86 +1021,6 @@ public class ValidationManagerImplTest extends BrambleTestCase {
|
|||||||
oneOf(db).endTransaction(txn2);
|
oneOf(db).endTransaction(txn2);
|
||||||
}});
|
}});
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMessageDependencyCycle() throws Exception {
|
|
||||||
final MessageContext cycleContext = new MessageContext(metadata,
|
|
||||||
Collections.singletonList(messageId));
|
|
||||||
|
|
||||||
Mockery context = new Mockery();
|
|
||||||
final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
|
||||||
final Executor dbExecutor = new ImmediateExecutor();
|
|
||||||
final Executor cryptoExecutor = new ImmediateExecutor();
|
|
||||||
final MessageFactory messageFactory =
|
|
||||||
context.mock(MessageFactory.class);
|
|
||||||
final MessageValidator validator = context.mock(MessageValidator.class);
|
|
||||||
final IncomingMessageHook hook =
|
|
||||||
context.mock(IncomingMessageHook.class);
|
|
||||||
final Transaction txn = new Transaction(null, true);
|
|
||||||
final Transaction txn1 = new Transaction(null, false);
|
|
||||||
final Transaction txn2 = new Transaction(null, true);
|
|
||||||
final Transaction txn3 = new Transaction(null, false);
|
|
||||||
context.checking(new Expectations() {{
|
|
||||||
// Load the group
|
|
||||||
oneOf(db).startTransaction(true);
|
|
||||||
will(returnValue(txn));
|
|
||||||
oneOf(db).getGroup(txn, groupId);
|
|
||||||
will(returnValue(group));
|
|
||||||
oneOf(db).commitTransaction(txn);
|
|
||||||
oneOf(db).endTransaction(txn);
|
|
||||||
// Validate the message: valid
|
|
||||||
oneOf(validator).validateMessage(message, group);
|
|
||||||
will(returnValue(validResultWithDependencies));
|
|
||||||
// Store the validation result
|
|
||||||
oneOf(db).startTransaction(false);
|
|
||||||
will(returnValue(txn1));
|
|
||||||
oneOf(db).addMessageDependencies(txn1, message,
|
|
||||||
validResultWithDependencies.getDependencies());
|
|
||||||
oneOf(db).getMessageDependencies(txn1, messageId);
|
|
||||||
will(returnValue(Collections.singletonMap(messageId1, UNKNOWN)));
|
|
||||||
oneOf(db).mergeMessageMetadata(txn1, messageId, metadata);
|
|
||||||
oneOf(db).setMessageState(txn1, messageId, PENDING);
|
|
||||||
oneOf(db).commitTransaction(txn1);
|
|
||||||
oneOf(db).endTransaction(txn1);
|
|
||||||
// Second message is coming in
|
|
||||||
oneOf(db).startTransaction(true);
|
|
||||||
will(returnValue(txn2));
|
|
||||||
oneOf(db).getGroup(txn2, groupId);
|
|
||||||
will(returnValue(group));
|
|
||||||
oneOf(db).commitTransaction(txn2);
|
|
||||||
oneOf(db).endTransaction(txn2);
|
|
||||||
// Validate the message: valid
|
|
||||||
oneOf(validator).validateMessage(message1, group);
|
|
||||||
will(returnValue(cycleContext));
|
|
||||||
// Store the validation result
|
|
||||||
oneOf(db).startTransaction(false);
|
|
||||||
will(returnValue(txn3));
|
|
||||||
oneOf(db).addMessageDependencies(txn3, message1,
|
|
||||||
cycleContext.getDependencies());
|
|
||||||
oneOf(db).getMessageDependencies(txn3, messageId1);
|
|
||||||
will(returnValue(Collections.singletonMap(messageId, PENDING)));
|
|
||||||
oneOf(db).mergeMessageMetadata(txn3, messageId1, metadata);
|
|
||||||
oneOf(db).setMessageState(txn3, messageId1, PENDING);
|
|
||||||
oneOf(db).commitTransaction(txn3);
|
|
||||||
oneOf(db).endTransaction(txn3);
|
|
||||||
}});
|
|
||||||
|
|
||||||
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
|
|
||||||
cryptoExecutor, messageFactory);
|
|
||||||
vm.registerMessageValidator(clientId, validator);
|
|
||||||
vm.registerIncomingMessageHook(clientId, hook);
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message, contactId));
|
|
||||||
vm.eventOccurred(new MessageAddedEvent(message1, contactId));
|
|
||||||
|
|
||||||
context.assertIsSatisfied();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user