Merge branch 'mailbox-integration-tests' into 'master'

First integration test for mailbox with two contacts

See merge request briar/briar!1725
This commit is contained in:
akwizgran
2022-11-07 12:58:18 +00:00
53 changed files with 800 additions and 316 deletions

View File

@@ -4,6 +4,7 @@ import org.briarproject.nullsafety.NotNullByDefault;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -19,9 +20,10 @@ public class TimeLoggingExecutor extends ThreadPoolExecutor {
public TimeLoggingExecutor(String tag, int corePoolSize, int maxPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue,
handler);
threadFactory, handler);
log = Logger.getLogger(tag);
}

View File

@@ -9,6 +9,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Inject;
@@ -37,31 +38,31 @@ public class CryptoExecutorModule {
private static final int MAX_EXECUTOR_THREADS =
Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
private final ExecutorService cryptoExecutor;
public CryptoExecutorModule() {
// Use an unbounded queue
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create a limited # of threads and keep them in the pool for 60 secs
cryptoExecutor = new TimeLoggingExecutor("CryptoExecutor", 0,
MAX_EXECUTOR_THREADS, 60, SECONDS, queue, policy);
}
@Provides
@Singleton
@CryptoExecutor
ExecutorService provideCryptoExecutorService(
LifecycleManager lifecycleManager) {
LifecycleManager lifecycleManager, ThreadFactory threadFactory) {
// Use an unbounded queue
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create a limited # of threads and keep them in the pool for 60 secs
ExecutorService cryptoExecutor = new TimeLoggingExecutor(
"CryptoExecutor", 0, MAX_EXECUTOR_THREADS, 60, SECONDS, queue,
threadFactory, policy);
lifecycleManager.registerForShutdown(cryptoExecutor);
return cryptoExecutor;
}
@Provides
@CryptoExecutor
Executor provideCryptoExecutor() {
Executor provideCryptoExecutor(
@CryptoExecutor ExecutorService cryptoExecutor) {
return cryptoExecutor;
}
}

View File

@@ -9,6 +9,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Inject;
@@ -28,24 +29,20 @@ public class DatabaseExecutorModule {
ExecutorService executorService;
}
private final ExecutorService databaseExecutor;
public DatabaseExecutorModule() {
@Provides
@Singleton
@DatabaseExecutor
ExecutorService provideDatabaseExecutorService(
LifecycleManager lifecycleManager, ThreadFactory threadFactory) {
// Use an unbounded queue
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Use a single thread and keep it in the pool for 60 secs
databaseExecutor = new TimeLoggingExecutor("DatabaseExecutor", 0, 1,
60, SECONDS, queue, policy);
}
@Provides
@Singleton
@DatabaseExecutor
ExecutorService provideDatabaseExecutorService(
LifecycleManager lifecycleManager) {
ExecutorService databaseExecutor = new TimeLoggingExecutor(
"DatabaseExecutor", 0, 1, 60, SECONDS, queue, threadFactory,
policy);
lifecycleManager.registerForShutdown(databaseExecutor);
return databaseExecutor;
}

View File

@@ -3,6 +3,7 @@ package org.briarproject.bramble.event;
import org.briarproject.bramble.api.event.EventExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import javax.inject.Singleton;
@@ -22,10 +23,11 @@ public class DefaultEventExecutorModule {
@Provides
@Singleton
@EventExecutor
Executor provideEventExecutor() {
Executor provideEventExecutor(ThreadFactory threadFactory) {
return newSingleThreadExecutor(r -> {
Thread t = new Thread(r);
Thread t = threadFactory.newThread(r);
t.setDaemon(true);
t.setName(t.getName() + "-Event");
return t;
});
}

View File

@@ -9,6 +9,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Inject;
@@ -28,19 +29,6 @@ public class LifecycleModule {
Executor executor;
}
private final ExecutorService ioExecutor;
public LifecycleModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ioExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
}
@Provides
@Singleton
ShutdownManager provideShutdownManager() {
@@ -57,7 +45,16 @@ public class LifecycleModule {
@Provides
@Singleton
@IoExecutor
Executor provideIoExecutor(LifecycleManager lifecycleManager) {
Executor provideIoExecutor(LifecycleManager lifecycleManager,
ThreadFactory threadFactory) {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ExecutorService ioExecutor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 60, SECONDS, queue, threadFactory, policy);
lifecycleManager.registerForShutdown(ioExecutor);
return ioExecutor;
}

View File

@@ -20,12 +20,15 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
class MailboxApiCallerImpl implements MailboxApiCaller {
private final TaskScheduler taskScheduler;
private final MailboxConfig mailboxConfig;
private final Executor ioExecutor;
@Inject
MailboxApiCallerImpl(TaskScheduler taskScheduler,
MailboxConfig mailboxConfig,
@IoExecutor Executor ioExecutor) {
this.taskScheduler = taskScheduler;
this.mailboxConfig = mailboxConfig;
this.ioExecutor = ioExecutor;
}
@@ -49,7 +52,8 @@ class MailboxApiCallerImpl implements MailboxApiCaller {
private boolean cancelled = false;
@GuardedBy("lock")
private long retryIntervalMs = MIN_RETRY_INTERVAL_MS;
private long retryIntervalMs =
mailboxConfig.getApiCallerMinRetryInterval();
private Task(ApiCall apiCall) {
this.apiCall = apiCall;
@@ -74,8 +78,9 @@ class MailboxApiCallerImpl implements MailboxApiCaller {
scheduledTask = taskScheduler.schedule(this::callApi,
ioExecutor, retryIntervalMs, MILLISECONDS);
// Increase the retry interval each time we retry
retryIntervalMs =
min(MAX_RETRY_INTERVAL_MS, retryIntervalMs * 2);
retryIntervalMs = min(
mailboxConfig.getApiCallerMaxRetryInterval(),
retryIntervalMs * 2);
}
} else {
synchronized (lock) {

View File

@@ -0,0 +1,24 @@
package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.plugin.Plugin;
interface MailboxConfig {
/**
* The minimum interval between API call retries in milliseconds.
*/
long getApiCallerMinRetryInterval();
/**
* The maximum interval between API call retries in milliseconds.
*/
long getApiCallerMaxRetryInterval();
/**
* How long (in milliseconds) the Tor plugin needs to be continuously
* {@link Plugin.State#ACTIVE active} before we assume our contacts can
* reach our hidden service.
*/
long getTorReachabilityPeriod();
}

View File

@@ -0,0 +1,30 @@
package org.briarproject.bramble.mailbox;
import org.briarproject.nullsafety.NotNullByDefault;
import javax.annotation.concurrent.Immutable;
import javax.inject.Inject;
@Immutable
@NotNullByDefault
class MailboxConfigImpl implements MailboxConfig {
@Inject
MailboxConfigImpl() {
}
@Override
public long getApiCallerMinRetryInterval() {
return MailboxApiCaller.MIN_RETRY_INTERVAL_MS;
}
@Override
public long getApiCallerMaxRetryInterval() {
return MailboxApiCaller.MAX_RETRY_INTERVAL_MS;
}
@Override
public long getTorReachabilityPeriod() {
return TorReachabilityMonitor.REACHABILITY_PERIOD_MS;
}
}

View File

@@ -4,7 +4,11 @@ import dagger.Module;
import dagger.Provides;
@Module
public class UrlConverterModule {
public class ModularMailboxModule {
@Provides
MailboxConfig provideMailboxConfig(MailboxConfigImpl mailboxConfig) {
return mailboxConfig;
}
@Provides
UrlConverter provideUrlConverter(UrlConverterImpl urlConverter) {

View File

@@ -32,6 +32,7 @@ class TorReachabilityMonitorImpl
private final Executor ioExecutor;
private final TaskScheduler taskScheduler;
private final MailboxConfig mailboxConfig;
private final PluginManager pluginManager;
private final EventBus eventBus;
private final Object lock = new Object();
@@ -50,10 +51,12 @@ class TorReachabilityMonitorImpl
TorReachabilityMonitorImpl(
@IoExecutor Executor ioExecutor,
TaskScheduler taskScheduler,
MailboxConfig mailboxConfig,
PluginManager pluginManager,
EventBus eventBus) {
this.ioExecutor = ioExecutor;
this.taskScheduler = taskScheduler;
this.mailboxConfig = mailboxConfig;
this.pluginManager = pluginManager;
this.eventBus = eventBus;
}
@@ -110,7 +113,7 @@ class TorReachabilityMonitorImpl
synchronized (lock) {
if (destroyed || task != null) return;
task = taskScheduler.schedule(this::onTorReachable, ioExecutor,
REACHABILITY_PERIOD_MS, MILLISECONDS);
mailboxConfig.getTorReachabilityPeriod(), MILLISECONDS);
}
}

View File

@@ -6,6 +6,7 @@ import org.briarproject.bramble.api.system.TaskScheduler;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -21,18 +22,15 @@ public class DefaultTaskSchedulerModule {
TaskScheduler scheduler;
}
private final ScheduledExecutorService scheduledExecutorService;
public DefaultTaskSchedulerModule() {
@Provides
@Singleton
TaskScheduler provideTaskScheduler(LifecycleManager lifecycleManager,
ThreadFactory threadFactory) {
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ScheduledThreadPoolExecutor.DiscardPolicy();
scheduledExecutorService = new ScheduledThreadPoolExecutor(1, policy);
}
@Provides
@Singleton
TaskScheduler provideTaskScheduler(LifecycleManager lifecycleManager) {
ScheduledExecutorService scheduledExecutorService =
new ScheduledThreadPoolExecutor(1, threadFactory, policy);
lifecycleManager.registerForShutdown(scheduledExecutorService);
return new TaskSchedulerImpl(scheduledExecutorService);
}

View File

@@ -0,0 +1,18 @@
package org.briarproject.bramble.system;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.inject.Singleton;
import dagger.Module;
import dagger.Provides;
@Module
public class DefaultThreadFactoryModule {
@Provides
@Singleton
ThreadFactory provideThreadFactory() {
return Executors.defaultThreadFactory();
}
}