Merge branch 'simpler-threading'.

This merge reduces the number of thread pools.
This commit is contained in:
akwizgran
2014-10-02 18:26:26 +01:00
49 changed files with 947 additions and 1517 deletions

View File

@@ -10,7 +10,6 @@
<item>org.briarproject.messaging.duplex.DuplexMessagingModule</item>
<item>org.briarproject.messaging.simplex.SimplexMessagingModule</item>
<item>org.briarproject.plugins.AndroidPluginsModule</item>
<item>org.briarproject.plugins.PluginsModule</item>
<item>org.briarproject.serial.SerialModule</item>
<item>org.briarproject.system.AndroidSystemModule</item>
<item>org.briarproject.transport.TransportModule</item>

View File

@@ -1,21 +1,13 @@
package org.briarproject.android;
import static android.content.Context.MODE_PRIVATE;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Singleton;
import org.briarproject.api.android.AndroidExecutor;
import org.briarproject.api.android.AndroidNotificationManager;
import org.briarproject.api.android.DatabaseUiExecutor;
import org.briarproject.api.android.ReferenceManager;
import org.briarproject.api.db.DatabaseConfig;
import org.briarproject.api.lifecycle.LifecycleManager;
@@ -28,18 +20,9 @@ import com.google.inject.Provides;
public class AndroidModule extends AbstractModule {
private final ExecutorService databaseUiExecutor;
private final UiCallback uiCallback;
public AndroidModule() {
// The queue is unbounded, so tasks can be dependent
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Use a single thread so DB accesses from the UI don't overlap
databaseUiExecutor = new ThreadPoolExecutor(1, 1, 60, SECONDS, queue,
policy);
// Use a dummy UI callback
uiCallback = new UiCallback() {
@@ -53,10 +36,11 @@ public class AndroidModule extends AbstractModule {
public void showMessage(String... message) {
throw new UnsupportedOperationException();
}
}
};
}
@Override
protected void configure() {
bind(AndroidExecutor.class).to(AndroidExecutorImpl.class).in(
Singleton.class);
@@ -65,12 +49,6 @@ public class AndroidModule extends AbstractModule {
bind(UiCallback.class).toInstance(uiCallback);
}
@Provides @Singleton @DatabaseUiExecutor
Executor getDatabaseUiExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(databaseUiExecutor);
return databaseUiExecutor;
}
@Provides @Singleton
DatabaseConfig getDatabaseConfig(final Application app) {
final File dir = app.getApplicationContext().getDir("db", MODE_PRIVATE);

View File

@@ -23,8 +23,8 @@ import org.briarproject.android.groups.GroupListActivity;
import org.briarproject.api.ContactId;
import org.briarproject.api.Settings;
import org.briarproject.api.android.AndroidNotificationManager;
import org.briarproject.api.android.DatabaseUiExecutor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
@@ -51,7 +51,7 @@ Service, EventListener {
Logger.getLogger(AndroidNotificationManagerImpl.class.getName());
private final DatabaseComponent db;
private final Executor dbUiExecutor;
private final Executor dbExecutor;
private final Context appContext;
private final Map<ContactId, Integer> contactCounts =
new HashMap<ContactId, Integer>(); // Locking: this
@@ -65,9 +65,9 @@ Service, EventListener {
@Inject
public AndroidNotificationManagerImpl(DatabaseComponent db,
@DatabaseUiExecutor Executor dbExecutor, Application app) {
@DatabaseExecutor Executor dbExecutor, Application app) {
this.db = db;
this.dbUiExecutor = dbExecutor;
this.dbExecutor = dbExecutor;
appContext = app.getApplicationContext();
}
@@ -78,7 +78,7 @@ Service, EventListener {
}
private void loadSettings() {
dbUiExecutor.execute(new Runnable() {
dbExecutor.execute(new Runnable() {
public void run() {
try {
settings = db.getSettings();

View File

@@ -5,14 +5,18 @@ import static android.content.Intent.FLAG_ACTIVITY_SINGLE_TOP;
import static android.view.WindowManager.LayoutParams.FLAG_SECURE;
import static android.view.inputmethod.InputMethodManager.HIDE_IMPLICIT_ONLY;
import static org.briarproject.android.TestingConstants.PREVENT_SCREENSHOTS;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.android.BriarService.BriarBinder;
import org.briarproject.android.BriarService.BriarServiceConnection;
import org.briarproject.api.android.DatabaseUiExecutor;
import org.briarproject.api.db.DatabaseConfig;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.lifecycle.LifecycleManager;
import roboguice.activity.RoboActivity;
import android.annotation.SuppressLint;
import android.content.Intent;
@@ -35,7 +39,7 @@ public class BriarActivity extends RoboActivity {
private boolean bound = false;
// Fields that are accessed from background threads must be volatile
@Inject @DatabaseUiExecutor private volatile Executor dbUiExecutor;
@Inject @DatabaseExecutor private volatile Executor dbExecutor;
@Inject private volatile LifecycleManager lifecycleManager;
@Override
@@ -113,7 +117,7 @@ public class BriarActivity extends RoboActivity {
}
protected void runOnDbThread(final Runnable task) {
dbUiExecutor.execute(new Runnable() {
dbExecutor.execute(new Runnable() {
public void run() {
try {
lifecycleManager.waitForDatabase();

View File

@@ -18,9 +18,9 @@ import org.briarproject.R;
import org.briarproject.api.ContactId;
import org.briarproject.api.android.AndroidExecutor;
import org.briarproject.api.android.AndroidNotificationManager;
import org.briarproject.api.android.DatabaseUiExecutor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseConfig;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
@@ -56,7 +56,7 @@ public class BriarService extends RoboService implements EventListener {
// Fields that are accessed from background threads must be volatile
@Inject private volatile LifecycleManager lifecycleManager;
@Inject private volatile AndroidExecutor androidExecutor;
@Inject @DatabaseUiExecutor private volatile Executor dbUiExecutor;
@Inject @DatabaseExecutor private volatile Executor dbExecutor;
@Inject private volatile DatabaseComponent db;
private volatile boolean started = false;
@@ -130,6 +130,7 @@ public class BriarService extends RoboService implements EventListener {
return START_NOT_STICKY; // Don't restart automatically if killed
}
@Override
public IBinder onBind(Intent intent) {
return binder;
}
@@ -170,7 +171,7 @@ public class BriarService extends RoboService implements EventListener {
}
private void showMessageNotification(final GroupId g, final ContactId c) {
dbUiExecutor.execute(new Runnable() {
dbExecutor.execute(new Runnable() {
public void run() {
try {
lifecycleManager.waitForDatabase();

View File

@@ -30,8 +30,8 @@ public class SplashScreenActivity extends RoboSplashActivity {
private static final Logger LOG =
Logger.getLogger(SplashScreenActivity.class.getName());
// This build expires on 12 July 2014
private static final long EXPIRY_DATE = 1405123200 * 1000L;
// This build expires on 8 October 2014
private static final long EXPIRY_DATE = 1412726400 * 1000L;
private long now = System.currentTimeMillis();

View File

@@ -7,7 +7,7 @@ import java.util.concurrent.Executor;
import org.briarproject.api.android.AndroidExecutor;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.duplex.DuplexPluginConfig;
import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
import org.briarproject.api.plugins.simplex.SimplexPluginConfig;
@@ -20,13 +20,9 @@ import org.briarproject.plugins.tor.TorPluginFactory;
import android.app.Application;
import android.content.Context;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class AndroidPluginsModule extends AbstractModule {
@Override
protected void configure() {}
public class AndroidPluginsModule extends PluginsModule {
@Provides
SimplexPluginConfig getSimplexPluginConfig() {
@@ -38,18 +34,16 @@ public class AndroidPluginsModule extends AbstractModule {
}
@Provides
DuplexPluginConfig getDuplexPluginConfig(
@PluginExecutor Executor pluginExecutor,
DuplexPluginConfig getDuplexPluginConfig(@IoExecutor Executor ioExecutor,
AndroidExecutor androidExecutor, Application app,
CryptoComponent crypto, LocationUtils locationUtils) {
Context appContext = app.getApplicationContext();
DuplexPluginFactory bluetooth = new DroidtoothPluginFactory(
pluginExecutor, androidExecutor, appContext,
crypto.getSecureRandom());
DuplexPluginFactory tor = new TorPluginFactory(pluginExecutor,
appContext, locationUtils);
DuplexPluginFactory lan = new AndroidLanTcpPluginFactory(
pluginExecutor, appContext);
DuplexPluginFactory bluetooth = new DroidtoothPluginFactory(ioExecutor,
androidExecutor, appContext, crypto.getSecureRandom());
DuplexPluginFactory tor = new TorPluginFactory(ioExecutor, appContext,
locationUtils);
DuplexPluginFactory lan = new AndroidLanTcpPluginFactory(ioExecutor,
appContext);
final Collection<DuplexPluginFactory> factories =
Arrays.asList(bluetooth, tor, lan);
return new DuplexPluginConfig() {

View File

@@ -63,7 +63,7 @@ class DroidtoothPlugin implements DuplexPlugin {
private static final String DISCOVERY_FINISHED =
"android.bluetooth.adapter.action.DISCOVERY_FINISHED";
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final AndroidExecutor androidExecutor;
private final Context appContext;
private final SecureRandom secureRandom;
@@ -80,11 +80,11 @@ class DroidtoothPlugin implements DuplexPlugin {
// Non-null if the plugin started successfully
private volatile BluetoothAdapter adapter = null;
DroidtoothPlugin(Executor pluginExecutor, AndroidExecutor androidExecutor,
DroidtoothPlugin(Executor ioExecutor, AndroidExecutor androidExecutor,
Context appContext, SecureRandom secureRandom, Clock clock,
DuplexPluginCallback callback, int maxFrameLength, long maxLatency,
long pollingInterval) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.androidExecutor = androidExecutor;
this.appContext = appContext;
this.secureRandom = secureRandom;
@@ -147,7 +147,7 @@ class DroidtoothPlugin implements DuplexPlugin {
}
private void bind() {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
if(!isRunning()) return;
if(LOG.isLoggable(INFO))
@@ -256,7 +256,7 @@ class DroidtoothPlugin implements DuplexPlugin {
if(StringUtils.isNullOrEmpty(address)) continue;
final String uuid = e.getValue().get("uuid");
if(StringUtils.isNullOrEmpty(uuid)) continue;
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
if(!running) return;
BluetoothSocket s = connect(address, uuid);

View File

@@ -19,16 +19,16 @@ public class DroidtoothPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final AndroidExecutor androidExecutor;
private final Context appContext;
private final SecureRandom secureRandom;
private final Clock clock;
public DroidtoothPluginFactory(Executor pluginExecutor,
public DroidtoothPluginFactory(Executor ioExecutor,
AndroidExecutor androidExecutor, Context appContext,
SecureRandom secureRandom) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.androidExecutor = androidExecutor;
this.appContext = appContext;
this.secureRandom = secureRandom;
@@ -40,7 +40,7 @@ public class DroidtoothPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new DroidtoothPlugin(pluginExecutor, androidExecutor, appContext,
return new DroidtoothPlugin(ioExecutor, androidExecutor, appContext,
secureRandom, clock, callback, MAX_FRAME_LENGTH, MAX_LATENCY,
POLLING_INTERVAL);
}

View File

@@ -25,10 +25,10 @@ class AndroidLanTcpPlugin extends LanTcpPlugin {
private volatile BroadcastReceiver networkStateReceiver = null;
AndroidLanTcpPlugin(Executor pluginExecutor, Context appContext,
AndroidLanTcpPlugin(Executor ioExecutor, Context appContext,
DuplexPluginCallback callback, int maxFrameLength, long maxLatency,
long pollingInterval) {
super(pluginExecutor, callback, maxFrameLength, maxLatency,
super(ioExecutor, callback, maxFrameLength, maxLatency,
pollingInterval);
this.appContext = appContext;
}

View File

@@ -15,12 +15,11 @@ public class AndroidLanTcpPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final Context appContext;
public AndroidLanTcpPluginFactory(Executor pluginExecutor,
Context appContext) {
this.pluginExecutor = pluginExecutor;
public AndroidLanTcpPluginFactory(Executor ioExecutor, Context appContext) {
this.ioExecutor = ioExecutor;
this.appContext = appContext;
}
@@ -29,7 +28,7 @@ public class AndroidLanTcpPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new AndroidLanTcpPlugin(pluginExecutor, appContext, callback,
return new AndroidLanTcpPlugin(ioExecutor, appContext, callback,
MAX_FRAME_LENGTH, MAX_LATENCY, POLLING_INTERVAL);
}
}

View File

@@ -72,7 +72,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
private static final Logger LOG =
Logger.getLogger(TorPlugin.class.getName());
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final Context appContext;
private final LocationUtils locationUtils;
private final DuplexPluginCallback callback;
@@ -89,10 +89,10 @@ class TorPlugin implements DuplexPlugin, EventHandler {
private volatile TorControlConnection controlConnection = null;
private volatile BroadcastReceiver networkStateReceiver = null;
TorPlugin(Executor pluginExecutor, Context appContext,
TorPlugin(Executor ioExecutor, Context appContext,
LocationUtils locationUtils, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.appContext = appContext;
this.locationUtils = locationUtils;
this.callback = callback;
@@ -351,7 +351,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
}
private void bind() {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
// If there's already a port number stored in config, reuse it
String portString = callback.getConfig().get("port");
@@ -379,7 +379,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
c.put("port", localPort);
callback.mergeConfig(c);
// Create a hidden service if necessary
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
publishHiddenService(localPort);
}
@@ -506,7 +506,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
}
private void connectAndCallBack(final ContactId c) {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
DuplexTransportConnection d = createConnection(c);
if(d != null) callback.outgoingConnectionCreated(c, d);

View File

@@ -21,13 +21,13 @@ public class TorPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final Context appContext;
private final LocationUtils locationUtils;
public TorPluginFactory(Executor pluginExecutor, Context appContext,
public TorPluginFactory(Executor ioExecutor, Context appContext,
LocationUtils locationUtils) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.appContext = appContext;
this.locationUtils = locationUtils;
}
@@ -42,7 +42,7 @@ public class TorPluginFactory implements DuplexPluginFactory {
LOG.info("Tor is not supported on this architecture");
return null;
}
return new TorPlugin(pluginExecutor,appContext, locationUtils,
callback, MAX_FRAME_LENGTH, MAX_LATENCY, POLLING_INTERVAL);
return new TorPlugin(ioExecutor,appContext, locationUtils, callback,
MAX_FRAME_LENGTH, MAX_LATENCY, POLLING_INTERVAL);
}
}

View File

@@ -1,19 +0,0 @@
package org.briarproject.api.android;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import com.google.inject.BindingAnnotation;
/**
* Annotation for injecting the executor for accessing the database from the UI.
*/
@BindingAnnotation
@Target({ FIELD, METHOD, PARAMETER })
@Retention(RUNTIME)
public @interface DatabaseUiExecutor {}

View File

@@ -1,4 +1,4 @@
package org.briarproject.api.plugins;
package org.briarproject.api.lifecycle;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
@@ -10,8 +10,8 @@ import java.lang.annotation.Target;
import com.google.inject.BindingAnnotation;
/** Annotation for injecting the executor used by transport plugins. */
/** Annotation for injecting the executor used by long-lived IO tasks. */
@BindingAnnotation
@Target({ FIELD, METHOD, PARAMETER })
@Retention(RUNTIME)
public @interface PluginExecutor {}
public @interface IoExecutor {}

View File

@@ -1,17 +0,0 @@
package org.briarproject.api.reliability;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import com.google.inject.BindingAnnotation;
/** Annotation for injecting the executor used by reliability layers. */
@BindingAnnotation
@Target({ FIELD, METHOD, PARAMETER })
@Retention(RUNTIME)
public @interface ReliabilityExecutor {}

View File

@@ -1,19 +0,0 @@
package org.briarproject.api.transport;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import com.google.inject.BindingAnnotation;
/**
* Annotation for injecting the executor for recognising incoming connections.
*/
@BindingAnnotation
@Target({ FIELD, METHOD, PARAMETER })
@Retention(RUNTIME)
public @interface IncomingConnectionExecutor {}

View File

@@ -38,29 +38,22 @@ import org.briarproject.api.transport.TemporarySecret;
* terminated by calling either {@link #abortTransaction(T)} or
* {@link #commitTransaction(T)}, even if an exception is thrown.
* <p>
* Locking is provided by the DatabaseComponent implementation. To prevent
* deadlock, locks must be acquired in the following (alphabetical) order:
* <ul>
* <li> contact
* <li> identity
* <li> message
* <li> retention
* <li> setting
* <li> subscription
* <li> transport
* <li> window
* </ul>
* If table A has a foreign key pointing to table B, we get a read lock on A to
* read A, a write lock on A to write A, and write locks on A and B to write B.
* Read-write locking is provided by the DatabaseComponent implementation.
*/
interface Database<T> {
/** Opens the database and returns true if the database already existed. */
/**
* Opens the database and returns true if the database already existed.
* <p>
* Locking: write.
*/
boolean open() throws DbException, IOException;
/**
* Prevents new transactions from starting, waits for all current
* transactions to finish, and closes the database.
* <p>
* Locking: write.
*/
void close() throws DbException, IOException;
@@ -92,8 +85,7 @@ interface Database<T> {
* Stores a contact associated with the given local and remote pseudonyms,
* and returns an ID for the contact.
* <p>
* Locking: contact write, message write, retention write,
* subscription write, transport write, window write.
* Locking: write.
*/
ContactId addContact(T txn, Author remote, AuthorId local)
throws DbException;
@@ -101,7 +93,7 @@ interface Database<T> {
/**
* Stores an endpoint.
* <p>
* Locking: window write.
* Locking: write.
*/
void addEndpoint(T txn, Endpoint ep) throws DbException;
@@ -109,29 +101,28 @@ interface Database<T> {
* Subscribes to a group, or returns false if the user already has the
* maximum number of subscriptions.
* <p>
* Locking: message write, subscription write.
* Locking: write.
*/
boolean addGroup(T txn, Group g) throws DbException;
/**
* Stores a local pseudonym.
* <p>
* Locking: contact write, identity write, message write, retention write,
* subscription write, transport write, window write.
* Locking: write.
*/
void addLocalAuthor(T txn, LocalAuthor a) throws DbException;
/**
* Stores a message.
* <p>
* Locking: message write.
* Locking: write.
*/
void addMessage(T txn, Message m, boolean local) throws DbException;
/**
* Records that a message has been offered by the given contact.
* <p>
* Locking: message write.
* Locking: write.
*/
void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException;
@@ -139,7 +130,7 @@ interface Database<T> {
* Stores the given temporary secrets and deletes any secrets that have
* been made obsolete.
* <p>
* Locking: window write.
* Locking: write.
*/
void addSecrets(T txn, Collection<TemporarySecret> secrets)
throws DbException;
@@ -147,10 +138,10 @@ interface Database<T> {
/**
* Initialises the status of the given message with respect to the given
* contact.
* <p>
* Locking: write.
* @param ack whether the message needs to be acknowledged.
* @param seen whether the contact has seen the message.
* <p>
* Locking: message write.
*/
void addStatus(T txn, ContactId c, MessageId m, boolean ack, boolean seen)
throws DbException;
@@ -159,7 +150,7 @@ interface Database<T> {
* Stores a transport and returns true if the transport was not previously
* in the database.
* <p>
* Locking: transport write, window write.
* Locking: write.
*/
boolean addTransport(T txn, TransportId t, long maxLatency)
throws DbException;
@@ -167,49 +158,49 @@ interface Database<T> {
/**
* Makes a group visible to the given contact.
* <p>
* Locking: subscription write.
* Locking: write.
*/
void addVisibility(T txn, ContactId c, GroupId g) throws DbException;
/**
* Returns true if the database contains the given contact.
* <p>
* Locking: contact read.
* Locking: read.
*/
boolean containsContact(T txn, AuthorId a) throws DbException;
/**
* Returns true if the database contains the given contact.
* <p>
* Locking: contact read.
* Locking: read.
*/
boolean containsContact(T txn, ContactId c) throws DbException;
/**
* Returns true if the user subscribes to the given group.
* <p>
* Locking: subscription read.
* Locking: read.
*/
boolean containsGroup(T txn, GroupId g) throws DbException;
/**
* Returns true if the database contains the given local pseudonym.
* <p>
* Locking: identity read.
* Locking: read.
*/
boolean containsLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Returns true if the database contains the given message.
* <p>
* Locking: message read.
* Locking: read.
*/
boolean containsMessage(T txn, MessageId m) throws DbException;
/**
* Returns true if the database contains the given transport.
* <p>
* Locking: transport read.
* Locking: read.
*/
boolean containsTransport(T txn, TransportId t) throws DbException;
@@ -217,7 +208,7 @@ interface Database<T> {
* Returns true if the user subscribes to the given group and the group is
* visible to the given contact.
* <p>
* Locking: subscription read.
* Locking: read.
*/
boolean containsVisibleGroup(T txn, ContactId c, GroupId g)
throws DbException;
@@ -226,7 +217,7 @@ interface Database<T> {
* Returns true if the database contains the given message and the message
* is visible to the given contact.
* <p>
* Locking: message read, subscription read.
* Locking: read.
*/
boolean containsVisibleMessage(T txn, ContactId c, MessageId m)
throws DbException;
@@ -234,7 +225,7 @@ interface Database<T> {
/**
* Returns the number of messages offered by the given contact.
* <p>
* Locking: message read.
* Locking: read.
*/
int countOfferedMessages(T txn, ContactId c) throws DbException;
@@ -242,49 +233,49 @@ interface Database<T> {
* Returns the status of all groups to which the user subscribes or can
* subscribe, excluding inbox groups.
* <p>
* Locking: subscription read.
* Locking: read.
*/
Collection<GroupStatus> getAvailableGroups(T txn) throws DbException;
/**
* Returns the configuration for the given transport.
* <p>
* Locking: transport read.
* Locking: read.
*/
TransportConfig getConfig(T txn, TransportId t) throws DbException;
/**
* Returns the contact with the given ID.
* <p>
* Locking: contact read.
* Locking: read.
*/
Contact getContact(T txn, ContactId c) throws DbException;
/**
* Returns the IDs of all contacts.
* <p>
* Locking: contact read.
* Locking: read.
*/
Collection<ContactId> getContactIds(T txn) throws DbException;
/**
* Returns all contacts.
* <p>
* Locking: contact read, window read.
* Locking: read.
*/
Collection<Contact> getContacts(T txn) throws DbException;
/**
* Returns all contacts associated with the given local pseudonym.
* <p>
* Locking: contact read.
* Locking: read.
*/
Collection<ContactId> getContacts(T txn, AuthorId a) throws DbException;
/**
* Returns all endpoints.
* <p>
* Locking: window read.
* Locking: read.
*/
Collection<Endpoint> getEndpoints(T txn) throws DbException;
@@ -298,14 +289,14 @@ interface Database<T> {
/**
* Returns the group with the given ID, if the user subscribes to it.
* <p>
* Locking: subscription read.
* Locking: read.
*/
Group getGroup(T txn, GroupId g) throws DbException;
/**
* Returns all groups to which the user subscribes.
* <p>
* Locking: subscription read.
* Locking: read.
*/
Collection<Group> getGroups(T txn) throws DbException;
@@ -313,7 +304,7 @@ interface Database<T> {
* Returns the ID of the inbox group for the given contact, or null if no
* inbox group has been set.
* <p>
* Locking: contact read, subscription read.
* Locking: read.
*/
GroupId getInboxGroupId(T txn, ContactId c) throws DbException;
@@ -321,7 +312,7 @@ interface Database<T> {
* Returns the headers of all messages in the inbox group for the given
* contact, or null if no inbox group has been set.
* <p>
* Locking: contact read, identity read, message read, subscription read.
* Locking: read.
*/
Collection<MessageHeader> getInboxMessageHeaders(T txn, ContactId c)
throws DbException;
@@ -329,21 +320,21 @@ interface Database<T> {
/**
* Returns the local pseudonym with the given ID.
* <p>
* Locking: identity read.
* Locking: read.
*/
LocalAuthor getLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Returns all local pseudonyms.
* <p>
* Locking: identity read.
* Locking: read.
*/
Collection<LocalAuthor> getLocalAuthors(T txn) throws DbException;
/**
* Returns the local transport properties for all transports.
* <p>
* Locking: transport read.
* Locking: read.
*/
Map<TransportId, TransportProperties> getLocalProperties(T txn)
throws DbException;
@@ -351,7 +342,7 @@ interface Database<T> {
/**
* Returns the local transport properties for the given transport.
* <p>
* Locking: transport read.
* Locking: read.
*/
TransportProperties getLocalProperties(T txn, TransportId t)
throws DbException;
@@ -359,14 +350,14 @@ interface Database<T> {
/**
* Returns the body of the message identified by the given ID.
* <p>
* Locking: message read.
* Locking: read.
*/
byte[] getMessageBody(T txn, MessageId m) throws DbException;
/**
* Returns the headers of all messages in the given group.
* <p>
* Locking: message read.
* Locking: read.
*/
Collection<MessageHeader> getMessageHeaders(T txn, GroupId g)
throws DbException;
@@ -375,7 +366,7 @@ interface Database<T> {
* Returns the IDs of some messages received from the given contact that
* need to be acknowledged, up to the given number of messages.
* <p>
* Locking: message read.
* Locking: read.
*/
Collection<MessageId> getMessagesToAck(T txn, ContactId c, int maxMessages)
throws DbException;
@@ -384,7 +375,7 @@ interface Database<T> {
* Returns the IDs of some messages that are eligible to be offered to the
* given contact, up to the given number of messages.
* <p>
* Locking: message read, subscription read.
* Locking: read.
*/
Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
int maxMessages) throws DbException;
@@ -393,7 +384,7 @@ interface Database<T> {
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given total length.
* <p>
* Locking: message read, subscription read.
* Locking: read.
*/
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength)
throws DbException;
@@ -402,7 +393,7 @@ interface Database<T> {
* Returns the IDs of some messages that are eligible to be requested from
* the given contact, up to the given number of messages.
* <p>
* Locking: message read.
* Locking: read.
*/
Collection<MessageId> getMessagesToRequest(T txn, ContactId c,
int maxMessages) throws DbException;
@@ -411,7 +402,7 @@ interface Database<T> {
* Returns the IDs of the oldest messages in the database, with a total
* size less than or equal to the given size.
* <p>
* Locking: message read.
* Locking: read.
*/
Collection<MessageId> getOldMessages(T txn, int size) throws DbException;
@@ -420,28 +411,28 @@ interface Database<T> {
* has no parent, or the parent is absent from the database, or the parent
* belongs to a different group.
* <p>
* Locking: message read.
* Locking: read.
*/
MessageId getParent(T txn, MessageId m) throws DbException;
/**
* Returns the message identified by the given ID, in serialised form.
* <p>
* Locking: message read.
* Locking: read.
*/
byte[] getRawMessage(T txn, MessageId m) throws DbException;
/**
* Returns true if the given message is marked as read.
* <p>
* Locking: message read.
* Locking: read.
*/
boolean getReadFlag(T txn, MessageId m) throws DbException;
/**
* Returns all remote properties for the given transport.
* <p>
* Locking: transport read.
* Locking: read.
*/
Map<ContactId, TransportProperties> getRemoteProperties(T txn,
TransportId t) throws DbException;
@@ -451,7 +442,7 @@ interface Database<T> {
* given contact and have been requested by the contact, up to the given
* total length.
* <p>
* Locking: message read, subscription read.
* Locking: read.
*/
Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c,
int maxLength) throws DbException;
@@ -459,7 +450,7 @@ interface Database<T> {
/**
* Returns a retention ack for the given contact, or null if no ack is due.
* <p>
* Locking: retention write.
* Locking: write.
*/
RetentionAck getRetentionAck(T txn, ContactId c) throws DbException;
@@ -467,7 +458,7 @@ interface Database<T> {
* Returns a retention update for the given contact and updates its expiry
* time using the given latency, or returns null if no update is due.
* <p>
* Locking: message read, retention write.
* Locking: write.
*/
RetentionUpdate getRetentionUpdate(T txn, ContactId c, long maxLatency)
throws DbException;
@@ -475,21 +466,21 @@ interface Database<T> {
/**
* Returns all temporary secrets.
* <p>
* Locking: window read.
* Locking: read.
*/
Collection<TemporarySecret> getSecrets(T txn) throws DbException;
/**
* Returns all settings.
* <p>
* Locking: setting read.
* Locking: read.
*/
Settings getSettings(T txn) throws DbException;
/**
* Returns all contacts who subscribe to the given group.
* <p>
* Locking: subscription read.
* Locking: read.
*/
Collection<Contact> getSubscribers(T txn, GroupId g) throws DbException;
@@ -497,7 +488,7 @@ interface Database<T> {
* Returns a subscription ack for the given contact, or null if no ack is
* due.
* <p>
* Locking: subscription write.
* Locking: write.
*/
SubscriptionAck getSubscriptionAck(T txn, ContactId c) throws DbException;
@@ -505,7 +496,7 @@ interface Database<T> {
* Returns a subscription update for the given contact and updates its
* expiry time using the given latency, or returns null if no update is due.
* <p>
* Locking: subscription write.
* Locking: write.
*/
SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c,
long maxLatency) throws DbException;
@@ -514,7 +505,7 @@ interface Database<T> {
* Returns a collection of transport acks for the given contact, or null if
* no acks are due.
* <p>
* Locking: transport write.
* Locking: write.
*/
Collection<TransportAck> getTransportAcks(T txn, ContactId c)
throws DbException;
@@ -522,7 +513,7 @@ interface Database<T> {
/**
* Returns the maximum latencies of all local transports.
* <p>
* Locking: transport read.
* Locking: read.
*/
Map<TransportId, Long> getTransportLatencies(T txn) throws DbException;
@@ -531,7 +522,7 @@ interface Database<T> {
* updates their expiry times using the given latency, or returns null if
* no updates are due.
* <p>
* Locking: transport write.
* Locking: write.
*/
Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c,
long maxLatency) throws DbException;
@@ -539,14 +530,14 @@ interface Database<T> {
/**
* Returns the number of unread messages in each subscribed group.
* <p>
* Locking: message read.
* Locking: read.
*/
Map<GroupId, Integer> getUnreadMessageCounts(T txn) throws DbException;
/**
* Returns the IDs of all contacts to which the given group is visible.
* <p>
* Locking: subscription read.
* Locking: read.
*/
Collection<ContactId> getVisibility(T txn, GroupId g) throws DbException;
@@ -555,7 +546,7 @@ interface Database<T> {
* in the given rotation period and returns the old value, or -1 if the
* counter does not exist.
* <p>
* Locking: window write.
* Locking: write.
*/
long incrementConnectionCounter(T txn, ContactId c, TransportId t,
long period) throws DbException;
@@ -564,7 +555,7 @@ interface Database<T> {
* Increments the retention time versions for all contacts to indicate that
* the database's retention time has changed and updates should be sent.
* <p>
* Locking: retention write.
* Locking: write.
*/
void incrementRetentionVersions(T txn) throws DbException;
@@ -572,7 +563,7 @@ interface Database<T> {
* Marks the given messages as not needing to be acknowledged to the
* given contact.
* <p>
* Locking: message write.
* Locking: write.
*/
void lowerAckFlag(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
@@ -581,7 +572,7 @@ interface Database<T> {
* Marks the given messages as not having been requested by the given
* contact.
* <p>
* Locking: message write.
* Locking: write.
*/
void lowerRequestedFlag(T txn, ContactId c, Collection<MessageId> requested)
throws DbException;
@@ -590,7 +581,7 @@ interface Database<T> {
* Merges the given configuration with the existing configuration for the
* given transport.
* <p>
* Locking: transport write.
* Locking: write.
*/
void mergeConfig(T txn, TransportId t, TransportConfig config)
throws DbException;
@@ -599,7 +590,7 @@ interface Database<T> {
* Merges the given properties with the existing local properties for the
* given transport.
* <p>
* Locking: transport write.
* Locking: write.
*/
void mergeLocalProperties(T txn, TransportId t, TransportProperties p)
throws DbException;
@@ -607,36 +598,35 @@ interface Database<T> {
/**
* Merges the given settings with the existing settings.
* <p>
* Locking: setting write.
* Locking: write.
*/
void mergeSettings(T txn, Settings s) throws DbException;
/**
* Marks a message as needing to be acknowledged to the given contact.
* <p>
* Locking: message write.
* Locking: write.
*/
void raiseAckFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks a message as having been requested by the given contact.
* <p>
* Locking: message write.
* Locking: write.
*/
void raiseRequestedFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks a message as having been seen by the given contact.
* <p>
* Locking: message write.
* Locking: write.
*/
void raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Removes a contact from the database.
* <p>
* Locking: contact write, message write, retention write,
* subscription write, transport write, window write.
* Locking: write.
*/
void removeContact(T txn, ContactId c) throws DbException;
@@ -644,7 +634,7 @@ interface Database<T> {
* Unsubscribes from a group. Any messages belonging to the group are
* deleted from the database.
* <p>
* Locking: message write, subscription write.
* Locking: write.
*/
void removeGroup(T txn, GroupId g) throws DbException;
@@ -652,15 +642,14 @@ interface Database<T> {
* Removes a local pseudonym (and all associated contacts) from the
* database.
* <p>
* Locking: contact write, identity write, message write, retention write,
* subscription write, transport write, window write.
* Locking: write.
*/
void removeLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Removes a message (and all associated state) from the database.
* <p>
* Locking: message write.
* Locking: write.
*/
void removeMessage(T txn, MessageId m) throws DbException;
@@ -668,7 +657,7 @@ interface Database<T> {
* Removes an offered message that was offered by the given contact, or
* returns false if there is no such message.
* <p>
* Locking: message write.
* Locking: write.
*/
boolean removeOfferedMessage(T txn, ContactId c, MessageId m)
throws DbException;
@@ -677,7 +666,7 @@ interface Database<T> {
* Removes the given offered messages that were offered by the given
* contact.
* <p>
* Locking: message write.
* Locking: write.
*/
void removeOfferedMessages(T txn, ContactId c,
Collection<MessageId> requested) throws DbException;
@@ -685,14 +674,14 @@ interface Database<T> {
/**
* Removes a transport (and all associated state) from the database.
* <p>
* Locking: transport write, window write.
* Locking: write.
*/
void removeTransport(T txn, TransportId t) throws DbException;
/**
* Makes a group invisible to the given contact.
* <p>
* Locking: subscription write.
* Locking: write.
*/
void removeVisibility(T txn, ContactId c, GroupId g) throws DbException;
@@ -700,7 +689,7 @@ interface Database<T> {
* Resets the transmission count and expiry time of the given message with
* respect to the given contact.
* <p>
* Locking: message write.
* Locking: write.
*/
void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException;
@@ -708,7 +697,7 @@ interface Database<T> {
* Sets the connection reordering window for the given endpoint in the
* given rotation period.
* <p>
* Locking: window write.
* Locking: write.
*/
void setConnectionWindow(T txn, ContactId c, TransportId t, long period,
long centre, byte[] bitmap) throws DbException;
@@ -718,7 +707,7 @@ interface Database<T> {
* true, unless an update with an equal or higher version number has
* already been received from the contact.
* <p>
* Locking: message write, subscription write.
* Locking: write.
*/
boolean setGroups(T txn, ContactId c, Collection<Group> groups,
long version) throws DbException;
@@ -727,14 +716,14 @@ interface Database<T> {
* Makes a group visible to the given contact, adds it to the contact's
* subscriptions, and sets it as the inbox group for the contact.
* <p>
* Locking: subscription write.
* Locking: write.
*/
public void setInboxGroup(T txn, ContactId c, Group g) throws DbException;
/**
* Marks a message as read or unread.
* <p>
* Locking: message write.
* Locking: write.
*/
void setReadFlag(T txn, MessageId m, boolean read) throws DbException;
@@ -742,7 +731,7 @@ interface Database<T> {
* Sets the remote transport properties for the given contact, replacing
* any existing properties.
* <p>
* Locking: transport write.
* Locking: write.
*/
void setRemoteProperties(T txn, ContactId c,
Map<TransportId, TransportProperties> p) throws DbException;
@@ -753,7 +742,7 @@ interface Database<T> {
* unless an update with an equal or higher version number has already been
* received from the contact.
* <p>
* Locking: transport write.
* Locking: write.
*/
boolean setRemoteProperties(T txn, ContactId c, TransportId t,
TransportProperties p, long version) throws DbException;
@@ -763,7 +752,7 @@ interface Database<T> {
* true, unless an update with an equal or higher version number has
* already been received from the contact.
* <p>
* Locking: retention write.
* Locking: write.
*/
boolean setRetentionTime(T txn, ContactId c, long retention, long version)
throws DbException;
@@ -772,7 +761,7 @@ interface Database<T> {
* Records a retention ack from the given contact for the given version,
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: retention write.
* Locking: write.
*/
void setRetentionUpdateAcked(T txn, ContactId c, long version)
throws DbException;
@@ -781,7 +770,7 @@ interface Database<T> {
* Records a subscription ack from the given contact for the given version,
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: subscription write.
* Locking: write.
*/
void setSubscriptionUpdateAcked(T txn, ContactId c, long version)
throws DbException;
@@ -790,7 +779,7 @@ interface Database<T> {
* Records a transport ack from the give contact for the given version,
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: transport write.
* Locking: write.
*/
void setTransportUpdateAcked(T txn, ContactId c, TransportId t,
long version) throws DbException;
@@ -798,7 +787,7 @@ interface Database<T> {
/**
* Makes a group visible or invisible to future contacts by default.
* <p>
* Locking: subscription write.
* Locking: write.
*/
void setVisibleToAll(T txn, GroupId g, boolean all) throws DbException;
@@ -807,7 +796,7 @@ interface Database<T> {
* with respect to the given contact, using the latency of the transport
* over which it was sent.
* <p>
* Locking: message write.
* Locking: write.
*/
void updateExpiryTime(T txn, ContactId c, MessageId m, long maxLatency)
throws DbException;

File diff suppressed because it is too large Load Diff

View File

@@ -25,9 +25,6 @@ import com.google.inject.Provides;
public class DatabaseModule extends AbstractModule {
/** The maximum number of executor threads. */
private static final int MAX_EXECUTOR_THREADS = 10;
private final ExecutorService databaseExecutor;
public DatabaseModule() {
@@ -36,9 +33,9 @@ public class DatabaseModule extends AbstractModule {
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create a limited # of threads and keep them in the pool for 60 secs
databaseExecutor = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS,
60, SECONDS, queue, policy);
// Use a single thread and keep it in the pool for 60 secs
databaseExecutor = new ThreadPoolExecutor(0, 1, 60, SECONDS, queue,
policy);
}
protected void configure() {

View File

@@ -71,8 +71,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (key))";
// Locking: identity
// Dependents: contact, message, retention, subscription, transport, window
private static final String CREATE_LOCAL_AUTHORS =
"CREATE TABLE localAuthors"
+ " (authorId HASH NOT NULL,"
@@ -82,8 +80,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " created BIGINT NOT NULL,"
+ " PRIMARY KEY (authorId))";
// Locking: contact
// Dependents: message, retention, subscription, transport, window
private static final String CREATE_CONTACTS =
"CREATE TABLE contacts"
+ " (contactId COUNTER,"
@@ -97,8 +93,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES localAuthors (authorId)"
+ " ON DELETE CASCADE)";
// Locking: subscription
// Dependents: message
private static final String CREATE_GROUPS =
"CREATE TABLE groups"
+ " (groupId HASH NOT NULL,"
@@ -107,7 +101,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " visibleToAll BOOLEAN NOT NULL,"
+ " PRIMARY KEY (groupId))";
// Locking: subscription
private static final String CREATE_GROUP_VISIBILITIES =
"CREATE TABLE groupVisibilities"
+ " (contactId INT NOT NULL,"
@@ -121,7 +114,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES groups (groupId)"
+ " ON DELETE CASCADE)";
// Locking: subscription
private static final String CREATE_CONTACT_GROUPS =
"CREATE TABLE contactGroups"
+ " (contactId INT NOT NULL,"
@@ -133,7 +125,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: subscription
private static final String CREATE_GROUP_VERSIONS =
"CREATE TABLE groupVersions"
+ " (contactId INT NOT NULL,"
@@ -148,7 +139,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: message
private static final String CREATE_MESSAGES =
"CREATE TABLE messages"
+ " (messageId HASH NOT NULL,"
@@ -182,7 +172,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: message
private static final String CREATE_STATUSES =
"CREATE TABLE statuses"
+ " (messageId HASH NOT NULL,"
@@ -206,7 +195,6 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String INDEX_STATUSES_BY_CONTACT =
"CREATE INDEX statusesByContact ON statuses (contactId)";
// Locking: retention
private static final String CREATE_RETENTION_VERSIONS =
"CREATE TABLE retentionVersions"
+ " (contactId INT NOT NULL,"
@@ -222,15 +210,12 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: transport
// Dependents: window
private static final String CREATE_TRANSPORTS =
"CREATE TABLE transports"
+ " (transportId VARCHAR NOT NULL,"
+ " maxLatency BIGINT NOT NULL,"
+ " PRIMARY KEY (transportId))";
// Locking: transport
private static final String CREATE_TRANSPORT_CONFIGS =
"CREATE TABLE transportConfigs"
+ " (transportId VARCHAR NOT NULL,"
@@ -241,7 +226,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
// Locking: transport
private static final String CREATE_TRANSPORT_PROPS =
"CREATE TABLE transportProperties"
+ " (transportId VARCHAR NOT NULL,"
@@ -252,7 +236,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
// Locking: transport
private static final String CREATE_TRANSPORT_VERSIONS =
"CREATE TABLE transportVersions"
+ " (contactId INT NOT NULL,"
@@ -269,7 +252,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
// Locking: transport
private static final String CREATE_CONTACT_TRANSPORT_PROPS =
"CREATE TABLE contactTransportProperties"
+ " (contactId INT NOT NULL,"
@@ -281,7 +263,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: transport
private static final String CREATE_CONTACT_TRANSPORT_VERSIONS =
"CREATE TABLE contactTransportVersions"
+ " (contactId INT NOT NULL,"
@@ -293,7 +274,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: window
private static final String CREATE_ENDPOINTS =
"CREATE TABLE endpoints"
+ " (contactId INT NOT NULL,"
@@ -308,7 +288,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
// Locking: window
private static final String CREATE_SECRETS =
"CREATE TABLE secrets"
+ " (contactId INT NOT NULL,"
@@ -1098,8 +1077,8 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public boolean containsVisibleGroup(Connection txn, ContactId c,
GroupId g) throws DbException {
public boolean containsVisibleGroup(Connection txn, ContactId c, GroupId g)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {

View File

@@ -1,18 +1,49 @@
package org.briarproject.lifecycle;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Singleton;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.lifecycle.ShutdownManager;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class LifecycleModule extends AbstractModule {
private final ExecutorService ioExecutor;
public LifecycleModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ioExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
}
@Override
protected void configure() {
bind(LifecycleManager.class).to(
LifecycleManagerImpl.class).in(Singleton.class);
bind(ShutdownManager.class).to(
ShutdownManagerImpl.class).in(Singleton.class);
}
@Provides @Singleton @IoExecutor
Executor getIoExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(ioExecutor);
return ioExecutor;
}
}

View File

@@ -276,7 +276,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveAck implements Runnable {
private final Ack ack;
@@ -315,7 +315,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveMessage implements Runnable {
private final Message message;
@@ -334,7 +334,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveOffer implements Runnable {
private final Offer offer;
@@ -353,7 +353,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveRequest implements Runnable {
private final Request request;
@@ -372,7 +372,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveRetentionAck implements Runnable {
private final RetentionAck ack;
@@ -391,7 +391,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveRetentionUpdate implements Runnable {
private final RetentionUpdate update;
@@ -410,7 +410,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
@@ -429,7 +429,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;
@@ -448,7 +448,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveTransportAck implements Runnable {
private final TransportAck ack;
@@ -467,7 +467,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class ReceiveTransportUpdate implements Runnable {
private final TransportUpdate update;
@@ -486,7 +486,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateAck implements Runnable {
public void run() {
@@ -525,7 +525,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateBatch implements Runnable {
public void run() {
@@ -564,7 +564,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateOffer implements Runnable {
public void run() {
@@ -603,7 +603,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateRequest implements Runnable {
public void run() {
@@ -642,7 +642,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
@@ -679,7 +679,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
@@ -717,7 +717,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
@@ -754,7 +754,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
@@ -792,7 +792,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
@@ -830,7 +830,7 @@ abstract class DuplexConnection implements EventListener {
}
}
// This task runs on a database thread
// This task runs on the database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {

View File

@@ -23,9 +23,9 @@ import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginCallback;
import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
@@ -49,7 +49,7 @@ class PluginManagerImpl implements PluginManager {
private static final Logger LOG =
Logger.getLogger(PluginManagerImpl.class.getName());
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final SimplexPluginConfig simplexPluginConfig;
private final DuplexPluginConfig duplexPluginConfig;
private final Clock clock;
@@ -62,12 +62,12 @@ class PluginManagerImpl implements PluginManager {
private final List<DuplexPlugin> duplexPlugins;
@Inject
PluginManagerImpl(@PluginExecutor Executor pluginExecutor,
PluginManagerImpl(@IoExecutor Executor ioExecutor,
SimplexPluginConfig simplexPluginConfig,
DuplexPluginConfig duplexPluginConfig, Clock clock,
DatabaseComponent db, Poller poller,
ConnectionDispatcher dispatcher, UiCallback uiCallback) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.simplexPluginConfig = simplexPluginConfig;
this.duplexPluginConfig = duplexPluginConfig;
this.clock = clock;
@@ -87,14 +87,14 @@ class PluginManagerImpl implements PluginManager {
simplexPluginConfig.getFactories();
final CountDownLatch sLatch = new CountDownLatch(sFactories.size());
for(SimplexPluginFactory factory : sFactories)
pluginExecutor.execute(new SimplexPluginStarter(factory, sLatch));
ioExecutor.execute(new SimplexPluginStarter(factory, sLatch));
// Instantiate and start the duplex plugins
LOG.info("Starting duplex plugins");
Collection<DuplexPluginFactory> dFactories =
duplexPluginConfig.getFactories();
final CountDownLatch dLatch = new CountDownLatch(dFactories.size());
for(DuplexPluginFactory factory : dFactories)
pluginExecutor.execute(new DuplexPluginStarter(factory, dLatch));
ioExecutor.execute(new DuplexPluginStarter(factory, dLatch));
// Wait for the plugins to start
try {
sLatch.await();
@@ -119,11 +119,11 @@ class PluginManagerImpl implements PluginManager {
// Stop the simplex plugins
LOG.info("Stopping simplex plugins");
for(SimplexPlugin plugin : simplexPlugins)
pluginExecutor.execute(new PluginStopper(plugin, latch));
ioExecutor.execute(new PluginStopper(plugin, latch));
// Stop the duplex plugins
LOG.info("Stopping duplex plugins");
for(DuplexPlugin plugin : duplexPlugins)
pluginExecutor.execute(new PluginStopper(plugin, latch));
ioExecutor.execute(new PluginStopper(plugin, latch));
plugins.clear();
simplexPlugins.clear();
duplexPlugins.clear();

View File

@@ -1,18 +1,8 @@
package org.briarproject.plugins;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Singleton;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.plugins.PluginManager;
import com.google.inject.AbstractModule;
@@ -20,19 +10,7 @@ import com.google.inject.Provides;
public class PluginsModule extends AbstractModule {
private final ExecutorService pluginExecutor;
public PluginsModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
pluginExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
}
@Override
protected void configure() {
bind(Poller.class).to(PollerImpl.class);
}
@@ -43,10 +21,4 @@ public class PluginsModule extends AbstractModule {
lifecycleManager.register(pluginManager);
return pluginManager;
}
@Provides @Singleton @PluginExecutor
Executor getPluginExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(pluginExecutor);
return pluginExecutor;
}
}

View File

@@ -9,8 +9,8 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.ConnectionRegistry;
@@ -19,14 +19,14 @@ class PollerImpl implements Poller {
private static final Logger LOG =
Logger.getLogger(PollerImpl.class.getName());
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final ConnectionRegistry connRegistry;
private final Timer timer;
@Inject
PollerImpl(@PluginExecutor Executor pluginExecutor,
ConnectionRegistry connRegistry, Timer timer) {
this.pluginExecutor = pluginExecutor;
PollerImpl(@IoExecutor Executor ioExecutor, ConnectionRegistry connRegistry,
Timer timer) {
this.ioExecutor = ioExecutor;
this.connRegistry = connRegistry;
this.timer = timer;
}
@@ -49,7 +49,7 @@ class PollerImpl implements Poller {
}
public void pollNow(final Plugin p) {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
if(LOG.isLoggable(INFO))
LOG.info("Polling " + p.getClass().getSimpleName());
@@ -66,6 +66,7 @@ class PollerImpl implements Poller {
this.plugin = plugin;
}
@Override
public void run() {
pollNow(plugin);
schedule(plugin, false);

View File

@@ -25,7 +25,7 @@ public abstract class FilePlugin implements SimplexPlugin {
private static final Logger LOG =
Logger.getLogger(FilePlugin.class.getName());
protected final Executor pluginExecutor;
protected final Executor ioExecutor;
protected final FileUtils fileUtils;
protected final SimplexPluginCallback callback;
protected final int maxFrameLength;
@@ -38,10 +38,10 @@ public abstract class FilePlugin implements SimplexPlugin {
protected abstract void writerFinished(File f);
protected abstract void readerFinished(File f);
protected FilePlugin(Executor pluginExecutor, FileUtils fileUtils,
protected FilePlugin(Executor ioExecutor, FileUtils fileUtils,
SimplexPluginCallback callback, int maxFrameLength,
long maxLatency) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.fileUtils = fileUtils;
this.callback = callback;
this.maxFrameLength = maxFrameLength;
@@ -100,7 +100,7 @@ public abstract class FilePlugin implements SimplexPlugin {
protected void createReaderFromFile(final File f) {
if(!running) return;
pluginExecutor.execute(new ReaderCreator(f));
ioExecutor.execute(new ReaderCreator(f));
}
private class ReaderCreator implements Runnable {

View File

@@ -16,9 +16,9 @@ class LanTcpPlugin extends TcpPlugin {
static final TransportId ID = new TransportId("lan");
LanTcpPlugin(Executor pluginExecutor, DuplexPluginCallback callback,
LanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) {
super(pluginExecutor, callback, maxFrameLength, maxLatency,
super(ioExecutor, callback, maxFrameLength, maxLatency,
pollingInterval);
}

View File

@@ -13,10 +13,10 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute
private final Executor pluginExecutor;
private final Executor ioExecutor;
public LanTcpPluginFactory(Executor pluginExecutor) {
this.pluginExecutor = pluginExecutor;
public LanTcpPluginFactory(Executor ioExecutor) {
this.ioExecutor = ioExecutor;
}
public TransportId getId() {
@@ -24,7 +24,7 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new LanTcpPlugin(pluginExecutor, callback, MAX_FRAME_LENGTH,
return new LanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, POLLING_INTERVAL);
}
}

View File

@@ -35,7 +35,7 @@ abstract class TcpPlugin implements DuplexPlugin {
private static final Logger LOG =
Logger.getLogger(TcpPlugin.class.getName());
protected final Executor pluginExecutor;
protected final Executor ioExecutor;
protected final DuplexPluginCallback callback;
protected final int maxFrameLength;
protected final long maxLatency, pollingInterval;
@@ -52,9 +52,9 @@ abstract class TcpPlugin implements DuplexPlugin {
/** Returns true if connections to the given address can be attempted. */
protected abstract boolean isConnectable(InetSocketAddress remote);
protected TcpPlugin(Executor pluginExecutor, DuplexPluginCallback callback,
protected TcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.callback = callback;
this.maxFrameLength = maxFrameLength;
this.maxLatency = maxLatency;
@@ -76,7 +76,7 @@ abstract class TcpPlugin implements DuplexPlugin {
}
protected void bind() {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
if(!running) return;
ServerSocket ss = null;
@@ -172,7 +172,7 @@ abstract class TcpPlugin implements DuplexPlugin {
}
private void connectAndCallBack(final ContactId c) {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
DuplexTransportConnection d = createConnection(c);
if(d != null) callback.outgoingConnectionCreated(c, d);

View File

@@ -20,10 +20,10 @@ class WanTcpPlugin extends TcpPlugin {
private volatile MappingResult mappingResult;
WanTcpPlugin(Executor pluginExecutor, DuplexPluginCallback callback,
WanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval,
PortMapper portMapper) {
super(pluginExecutor, callback, maxFrameLength, maxLatency,
super(ioExecutor, callback, maxFrameLength, maxLatency,
pollingInterval);
this.portMapper = portMapper;
}

View File

@@ -14,12 +14,12 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 5 * 60 * 1000; // 5 minutes
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final ShutdownManager shutdownManager;
public WanTcpPluginFactory(Executor pluginExecutor,
public WanTcpPluginFactory(Executor ioExecutor,
ShutdownManager shutdownManager) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.shutdownManager = shutdownManager;
}
@@ -28,7 +28,7 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new WanTcpPlugin(pluginExecutor, callback, MAX_FRAME_LENGTH,
return new WanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, POLLING_INTERVAL,
new PortMapperImpl(shutdownManager));
}

View File

@@ -4,7 +4,7 @@ import java.util.concurrent.Executor;
import javax.inject.Inject;
import org.briarproject.api.reliability.ReliabilityExecutor;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.reliability.ReliabilityLayer;
import org.briarproject.api.reliability.ReliabilityLayerFactory;
import org.briarproject.api.reliability.WriteHandler;
@@ -13,16 +13,16 @@ import org.briarproject.system.SystemClock;
class ReliabilityLayerFactoryImpl implements ReliabilityLayerFactory {
private final Executor executor;
private final Executor ioExecutor;
private final Clock clock;
@Inject
ReliabilityLayerFactoryImpl(@ReliabilityExecutor Executor executor) {
this.executor = executor;
ReliabilityLayerFactoryImpl(@IoExecutor Executor ioExecutor) {
this.ioExecutor = ioExecutor;
clock = new SystemClock();
}
public ReliabilityLayer createReliabilityLayer(WriteHandler writeHandler) {
return new ReliabilityLayerImpl(executor, clock, writeHandler);
return new ReliabilityLayerImpl(ioExecutor, clock, writeHandler);
}
}

View File

@@ -1,46 +1,14 @@
package org.briarproject.reliability;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Singleton;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.reliability.ReliabilityExecutor;
import org.briarproject.api.reliability.ReliabilityLayerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class ReliabilityModule extends AbstractModule {
private final ExecutorService reliabilityExecutor;
public ReliabilityModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
reliabilityExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
}
@Override
protected void configure() {
bind(ReliabilityLayerFactory.class).to(
ReliabilityLayerFactoryImpl.class);
}
@Provides @Singleton @ReliabilityExecutor
Executor getReliabilityExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(reliabilityExecutor);
return reliabilityExecutor;
}
}

View File

@@ -14,6 +14,7 @@ import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DbException;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.messaging.duplex.DuplexConnectionFactory;
import org.briarproject.api.messaging.simplex.SimplexConnectionFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
@@ -22,31 +23,30 @@ import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionDispatcher;
import org.briarproject.api.transport.ConnectionRecogniser;
import org.briarproject.api.transport.IncomingConnectionExecutor;
class ConnectionDispatcherImpl implements ConnectionDispatcher {
private static final Logger LOG =
Logger.getLogger(ConnectionDispatcherImpl.class.getName());
private final Executor connExecutor;
private final Executor ioExecutor;
private final ConnectionRecogniser recogniser;
private final SimplexConnectionFactory simplexConnFactory;
private final DuplexConnectionFactory duplexConnFactory;
@Inject
ConnectionDispatcherImpl(@IncomingConnectionExecutor Executor connExecutor,
ConnectionDispatcherImpl(@IoExecutor Executor ioExecutor,
ConnectionRecogniser recogniser,
SimplexConnectionFactory simplexConnFactory,
DuplexConnectionFactory duplexConnFactory) {
this.connExecutor = connExecutor;
this.ioExecutor = ioExecutor;
this.recogniser = recogniser;
this.simplexConnFactory = simplexConnFactory;
this.duplexConnFactory = duplexConnFactory;
}
public void dispatchReader(TransportId t, SimplexTransportReader r) {
connExecutor.execute(new DispatchSimplexConnection(t, r));
ioExecutor.execute(new DispatchSimplexConnection(t, r));
}
public void dispatchWriter(ContactId c, TransportId t,
@@ -56,7 +56,7 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
public void dispatchIncomingConnection(TransportId t,
DuplexTransportConnection d) {
connExecutor.execute(new DispatchDuplexConnection(t, d));
ioExecutor.execute(new DispatchDuplexConnection(t, d));
}
public void dispatchOutgoingConnection(ContactId c, TransportId t,

View File

@@ -1,14 +1,5 @@
package org.briarproject.transport;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Singleton;
import org.briarproject.api.crypto.KeyManager;
@@ -18,26 +9,13 @@ import org.briarproject.api.transport.ConnectionReaderFactory;
import org.briarproject.api.transport.ConnectionRecogniser;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.ConnectionWriterFactory;
import org.briarproject.api.transport.IncomingConnectionExecutor;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class TransportModule extends AbstractModule {
private final ExecutorService incomingConnectionExecutor;
public TransportModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
incomingConnectionExecutor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 60, SECONDS, queue, policy);
}
@Override
protected void configure() {
bind(ConnectionDispatcher.class).to(ConnectionDispatcherImpl.class);
bind(ConnectionReaderFactory.class).to(
@@ -55,10 +33,4 @@ public class TransportModule extends AbstractModule {
lifecycleManager.register(keyManager);
return keyManager;
}
@Provides @Singleton @IncomingConnectionExecutor
Executor getIncomingConnectionExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(incomingConnectionExecutor);
return incomingConnectionExecutor;
}
}

View File

@@ -4,21 +4,20 @@ import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.lifecycle.ShutdownManager;
import org.briarproject.util.OsUtils;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
public class DesktopLifecycleModule extends AbstractModule {
public class DesktopLifecycleModule extends LifecycleModule {
@Override
protected void configure() {
bind(LifecycleManager.class).to(
LifecycleManagerImpl.class).in(Singleton.class);
if(OsUtils.isWindows()) {
bind(ShutdownManager.class).to(
WindowsShutdownManagerImpl.class).in(
Singleton.class);
WindowsShutdownManagerImpl.class).in(Singleton.class);
} else {
bind(ShutdownManager.class).to(
ShutdownManagerImpl.class).in(Singleton.class);
ShutdownManagerImpl.class).in(Singleton.class);
}
}
}

View File

@@ -5,8 +5,8 @@ import java.util.Collection;
import java.util.concurrent.Executor;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.lifecycle.ShutdownManager;
import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.plugins.duplex.DuplexPluginConfig;
import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
import org.briarproject.api.plugins.simplex.SimplexPluginConfig;
@@ -19,18 +19,15 @@ import org.briarproject.plugins.modem.ModemPluginFactory;
import org.briarproject.plugins.tcp.LanTcpPluginFactory;
import org.briarproject.plugins.tcp.WanTcpPluginFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class DesktopPluginsModule extends AbstractModule {
public void configure() {}
public class DesktopPluginsModule extends PluginsModule {
@Provides
SimplexPluginConfig getSimplexPluginConfig(
@PluginExecutor Executor pluginExecutor, FileUtils fileUtils) {
SimplexPluginConfig getSimplexPluginConfig(@IoExecutor Executor ioExecutor,
FileUtils fileUtils) {
SimplexPluginFactory removable =
new RemovableDrivePluginFactory(pluginExecutor, fileUtils);
new RemovableDrivePluginFactory(ioExecutor, fileUtils);
final Collection<SimplexPluginFactory> factories =
Arrays.asList(removable);
return new SimplexPluginConfig() {
@@ -41,16 +38,15 @@ public class DesktopPluginsModule extends AbstractModule {
}
@Provides
DuplexPluginConfig getDuplexPluginConfig(
@PluginExecutor Executor pluginExecutor,
DuplexPluginConfig getDuplexPluginConfig(@IoExecutor Executor ioExecutor,
CryptoComponent crypto, ReliabilityLayerFactory reliabilityFactory,
ShutdownManager shutdownManager) {
DuplexPluginFactory bluetooth = new BluetoothPluginFactory(
pluginExecutor, crypto.getSecureRandom());
DuplexPluginFactory modem = new ModemPluginFactory(pluginExecutor,
ioExecutor, crypto.getSecureRandom());
DuplexPluginFactory modem = new ModemPluginFactory(ioExecutor,
reliabilityFactory);
DuplexPluginFactory lan = new LanTcpPluginFactory(pluginExecutor);
DuplexPluginFactory wan = new WanTcpPluginFactory(pluginExecutor,
DuplexPluginFactory lan = new LanTcpPluginFactory(ioExecutor);
DuplexPluginFactory wan = new WanTcpPluginFactory(ioExecutor,
shutdownManager);
final Collection<DuplexPluginFactory> factories =
Arrays.asList(bluetooth, modem, lan, wan);

View File

@@ -42,7 +42,7 @@ class BluetoothPlugin implements DuplexPlugin {
Logger.getLogger(BluetoothPlugin.class.getName());
private static final int UUID_BYTES = 16;
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final Clock clock;
private final SecureRandom secureRandom;
private final DuplexPluginCallback callback;
@@ -54,10 +54,10 @@ class BluetoothPlugin implements DuplexPlugin {
private volatile StreamConnectionNotifier socket = null;
private volatile LocalDevice localDevice = null;
BluetoothPlugin(Executor pluginExecutor, Clock clock,
SecureRandom secureRandom, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) {
this.pluginExecutor = pluginExecutor;
BluetoothPlugin(Executor ioExecutor, Clock clock, SecureRandom secureRandom,
DuplexPluginCallback callback, int maxFrameLength, long maxLatency,
long pollingInterval) {
this.ioExecutor = ioExecutor;
this.clock = clock;
this.secureRandom = secureRandom;
this.callback = callback;
@@ -96,7 +96,7 @@ class BluetoothPlugin implements DuplexPlugin {
}
private void bind() {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
if(!running) return;
// Advertise the Bluetooth address to contacts
@@ -197,7 +197,7 @@ class BluetoothPlugin implements DuplexPlugin {
if(StringUtils.isNullOrEmpty(address)) continue;
final String uuid = e.getValue().get("uuid");
if(StringUtils.isNullOrEmpty(uuid)) continue;
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
if(!running) return;
StreamConnection s = connect(makeUrl(address, uuid));

View File

@@ -16,13 +16,13 @@ public class BluetoothPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final SecureRandom secureRandom;
private final Clock clock;
public BluetoothPluginFactory(Executor pluginExecutor,
public BluetoothPluginFactory(Executor ioExecutor,
SecureRandom secureRandom) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.secureRandom = secureRandom;
clock = new SystemClock();
}
@@ -32,7 +32,7 @@ public class BluetoothPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new BluetoothPlugin(pluginExecutor, clock, secureRandom,
callback, MAX_FRAME_LENGTH, MAX_LATENCY, POLLING_INTERVAL);
return new BluetoothPlugin(ioExecutor, clock, secureRandom, callback,
MAX_FRAME_LENGTH, MAX_LATENCY, POLLING_INTERVAL);
}
}

View File

@@ -11,7 +11,7 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
private static final Logger LOG =
Logger.getLogger(PollingRemovableDriveMonitor.class.getName());
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final RemovableDriveFinder finder;
private final long pollingInterval;
private final Object pollingLock = new Object();
@@ -19,9 +19,9 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
private volatile boolean running = false;
private volatile Callback callback = null;
public PollingRemovableDriveMonitor(Executor pluginExecutor,
public PollingRemovableDriveMonitor(Executor ioExecutor,
RemovableDriveFinder finder, long pollingInterval) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.finder = finder;
this.pollingInterval = pollingInterval;
}
@@ -29,7 +29,7 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
public void start(Callback callback) throws IOException {
this.callback = callback;
running = true;
pluginExecutor.execute(this);
ioExecutor.execute(this);
}
public void stop() throws IOException {

View File

@@ -27,11 +27,11 @@ implements RemovableDriveMonitor.Callback {
private final RemovableDriveFinder finder;
private final RemovableDriveMonitor monitor;
RemovableDrivePlugin(Executor pluginExecutor, FileUtils fileUtils,
RemovableDrivePlugin(Executor ioExecutor, FileUtils fileUtils,
SimplexPluginCallback callback, RemovableDriveFinder finder,
RemovableDriveMonitor monitor, int maxFrameLength,
long maxLatency) {
super(pluginExecutor, fileUtils, callback, maxFrameLength, maxLatency);
super(ioExecutor, fileUtils, callback, maxFrameLength, maxLatency);
this.finder = finder;
this.monitor = monitor;
}

View File

@@ -17,12 +17,12 @@ public class RemovableDrivePluginFactory implements SimplexPluginFactory {
private static final long MAX_LATENCY = 14 * 24 * 60 * 60 * 1000;
private static final long POLLING_INTERVAL = 10 * 1000; // 10 seconds
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final FileUtils fileUtils;
public RemovableDrivePluginFactory(Executor pluginExecutor,
public RemovableDrivePluginFactory(Executor ioExecutor,
FileUtils fileUtils) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.fileUtils = fileUtils;
}
@@ -42,16 +42,16 @@ public class RemovableDrivePluginFactory implements SimplexPluginFactory {
} else if(OsUtils.isMac()) {
// JNotify requires OS X 10.5 or newer, so we have to poll
finder = new MacRemovableDriveFinder();
monitor = new PollingRemovableDriveMonitor(pluginExecutor, finder,
monitor = new PollingRemovableDriveMonitor(ioExecutor, finder,
POLLING_INTERVAL);
} else if(OsUtils.isWindows()) {
finder = new WindowsRemovableDriveFinder();
monitor = new PollingRemovableDriveMonitor(pluginExecutor, finder,
monitor = new PollingRemovableDriveMonitor(ioExecutor, finder,
POLLING_INTERVAL);
} else {
return null;
}
return new RemovableDrivePlugin(pluginExecutor, fileUtils, callback,
return new RemovableDrivePlugin(ioExecutor, fileUtils, callback,
finder, monitor, MAX_FRAME_LENGTH, MAX_LATENCY);
}
}

View File

@@ -32,7 +32,7 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
private static final Logger LOG =
Logger.getLogger(ModemPlugin.class.getName());
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final ModemFactory modemFactory;
private final SerialPortList serialPortList;
private final DuplexPluginCallback callback;
@@ -43,11 +43,11 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
private volatile boolean running = false;
private volatile Modem modem = null;
ModemPlugin(Executor pluginExecutor, ModemFactory modemFactory,
ModemPlugin(Executor ioExecutor, ModemFactory modemFactory,
SerialPortList serialPortList, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval,
boolean shuffle) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.modemFactory = modemFactory;
this.serialPortList = serialPortList;
this.callback = callback;
@@ -112,7 +112,7 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
public void poll(Collection<ContactId> connected) {
if(!connected.isEmpty()) return; // One at a time please
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
poll();
}

View File

@@ -15,14 +15,14 @@ public class ModemPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 60 * 60 * 1000; // 1 hour
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final ModemFactory modemFactory;
private final SerialPortList serialPortList;
public ModemPluginFactory(Executor pluginExecutor,
public ModemPluginFactory(Executor ioExecutor,
ReliabilityLayerFactory reliabilityFactory) {
this.pluginExecutor = pluginExecutor;
modemFactory = new ModemFactoryImpl(pluginExecutor, reliabilityFactory);
this.ioExecutor = ioExecutor;
modemFactory = new ModemFactoryImpl(ioExecutor, reliabilityFactory);
serialPortList = new SerialPortListImpl();
}
@@ -34,7 +34,7 @@ public class ModemPluginFactory implements DuplexPluginFactory {
// This plugin is not enabled by default
String enabled = callback.getConfig().get("enabled");
if(StringUtils.isNullOrEmpty(enabled)) return null;
return new ModemPlugin(pluginExecutor, modemFactory, serialPortList,
return new ModemPlugin(ioExecutor, modemFactory, serialPortList,
callback, MAX_FRAME_LENGTH, MAX_LATENCY, POLLING_INTERVAL,
true);
}

View File

@@ -1,7 +1,10 @@
package org.briarproject;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.lifecycle.Service;
import org.briarproject.api.lifecycle.ShutdownManager;
@@ -10,6 +13,7 @@ import com.google.inject.AbstractModule;
public class TestLifecycleModule extends AbstractModule {
@Override
protected void configure() {
bind(LifecycleManager.class).toInstance(new LifecycleManager() {
@@ -37,5 +41,7 @@ public class TestLifecycleModule extends AbstractModule {
return true;
}
});
bind(Executor.class).annotatedWith(IoExecutor.class).toInstance(
Executors.newCachedThreadPool());
}
}

View File

@@ -25,7 +25,7 @@ import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.GroupId;
import org.briarproject.api.messaging.GroupFactory;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageFactory;
import org.briarproject.api.messaging.MessageVerifier;
@@ -59,7 +59,6 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
private final File testDir = TestUtils.getTestDirectory();
private final File aliceDir = new File(testDir, "alice");
private final File bobDir = new File(testDir, "bob");
private final Group group;
private final TransportId transportId;
private final byte[] initialSecret;
private final long epoch;
@@ -67,8 +66,6 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
private Injector alice, bob;
public SimplexMessagingIntegrationTest() throws Exception {
GroupId groupId = new GroupId(TestUtils.getRandomId());
group = new Group(groupId, "Group", new byte[GROUP_SALT_LENGTH]);
transportId = new TransportId("id");
// Create matching secrets for Alice and Bob
initialSecret = new byte[32];
@@ -77,6 +74,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
epoch = System.currentTimeMillis() - 2 * rotationPeriod;
}
@Override
@Before
public void setUp() {
testDir.mkdirs();
@@ -88,7 +86,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
return Guice.createInjector(new TestDatabaseModule(dir),
new TestLifecycleModule(), new TestSystemModule(),
new CryptoModule(), new DatabaseModule(), new MessagingModule(),
new DuplexMessagingModule(), new SimplexMessagingModule(),
new DuplexMessagingModule(), new SimplexMessagingModule(),
new SerialModule(), new TransportModule());
}
@@ -122,6 +120,8 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
new byte[MAX_PUBLIC_KEY_LENGTH]);
ContactId contactId = db.addContact(bobAuthor, aliceId);
// Add the inbox group
GroupFactory gf = alice.getInstance(GroupFactory.class);
Group group = gf.createGroup("Group", new byte[GROUP_SALT_LENGTH]);
db.addGroup(group);
db.setInboxGroup(contactId, group);
// Add the transport and the endpoint
@@ -181,6 +181,8 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
new byte[MAX_PUBLIC_KEY_LENGTH]);
ContactId contactId = db.addContact(aliceAuthor, bobId);
// Add the inbox group
GroupFactory gf = bob.getInstance(GroupFactory.class);
Group group = gf.createGroup("Group", new byte[GROUP_SALT_LENGTH]);
db.addGroup(group);
db.setInboxGroup(contactId, group);
// Add the transport and the endpoint
@@ -228,6 +230,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
db.close();
}
@Override
@After
public void tearDown() {
TestUtils.deleteTestDirectory(testDir);
@@ -235,7 +238,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
private static class MessageListener implements EventListener {
private boolean messageAdded = false;
private volatile boolean messageAdded = false;
public void eventOccurred(Event e) {
if(e instanceof MessageAddedEvent) messageAdded = true;

View File

@@ -29,7 +29,7 @@ public class PluginManagerImplTest extends BriarTestCase {
public void testStartAndStop() throws Exception {
Clock clock = new SystemClock();
Mockery context = new Mockery();
final Executor pluginExecutor = Executors.newCachedThreadPool();
final Executor ioExecutor = Executors.newCachedThreadPool();
final SimplexPluginConfig simplexPluginConfig =
context.mock(SimplexPluginConfig.class);
final DuplexPluginConfig duplexPluginConfig =
@@ -116,7 +116,7 @@ public class PluginManagerImplTest extends BriarTestCase {
oneOf(simplexPlugin).stop();
oneOf(duplexPlugin).stop();
}});
PluginManagerImpl p = new PluginManagerImpl(pluginExecutor,
PluginManagerImpl p = new PluginManagerImpl(ioExecutor,
simplexPluginConfig, duplexPluginConfig, clock, db, poller,
dispatcher, uiCallback);
// Two plugins should be started and stopped

View File

@@ -15,7 +15,6 @@ import org.briarproject.api.ContactId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.hamcrest.Description;
import org.jmock.Expectations;
import org.jmock.Mockery;
@@ -194,8 +193,7 @@ public class ModemPluginTest extends BriarTestCase {
@Test
public void testPolling() throws Exception {
final ExecutorService pluginExecutor =
Executors.newSingleThreadExecutor();
final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
Mockery context = new Mockery();
final ModemFactory modemFactory = context.mock(ModemFactory.class);
final SerialPortList serialPortList =
@@ -203,7 +201,7 @@ public class ModemPluginTest extends BriarTestCase {
final DuplexPluginCallback callback =
context.mock(DuplexPluginCallback.class);
// Disable shuffling for this test, it confuses jMock
final ModemPlugin plugin = new ModemPlugin(pluginExecutor, modemFactory,
final ModemPlugin plugin = new ModemPlugin(ioExecutor, modemFactory,
serialPortList, callback, 0, 0, 0, false);
final Modem modem = context.mock(Modem.class);
final TransportProperties local = new TransportProperties();
@@ -265,7 +263,7 @@ public class ModemPluginTest extends BriarTestCase {
assertTrue(plugin.start());
plugin.poll(Collections.<ContactId>emptyList());
assertTrue(disposeAction.invoked.await(5, SECONDS));
pluginExecutor.shutdown();
ioExecutor.shutdown();
context.assertIsSatisfied();
}