Dedicated executors for plugins and the connection recogniser.

This commit is contained in:
akwizgran
2011-12-08 16:11:24 +00:00
parent caf5f34828
commit 6e080bb35d
26 changed files with 223 additions and 176 deletions

View File

@@ -4,15 +4,16 @@ import java.io.IOException;
import java.util.concurrent.Executor;
import net.sf.briar.api.plugins.Plugin;
import net.sf.briar.api.plugins.PluginExecutor;
public abstract class AbstractPlugin implements Plugin {
protected final Executor executor;
protected final Executor pluginExecutor;
protected boolean started = false; // Locking: this
protected AbstractPlugin(Executor executor) {
this.executor = executor;
protected AbstractPlugin(@PluginExecutor Executor pluginExecutor) {
this.pluginExecutor = pluginExecutor;
}
public synchronized void start() throws IOException {

View File

@@ -21,6 +21,7 @@ import net.sf.briar.api.plugins.BatchPluginCallback;
import net.sf.briar.api.plugins.BatchPluginFactory;
import net.sf.briar.api.plugins.Plugin;
import net.sf.briar.api.plugins.PluginCallback;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.PluginManager;
import net.sf.briar.api.plugins.StreamPlugin;
import net.sf.briar.api.plugins.StreamPluginCallback;
@@ -51,7 +52,7 @@ class PluginManagerImpl implements PluginManager {
};
private final DatabaseComponent db;
private final Executor executor;
private final Executor pluginExecutor;
private final Poller poller;
private final ConnectionDispatcher dispatcher;
private final UiCallback uiCallback;
@@ -59,10 +60,11 @@ class PluginManagerImpl implements PluginManager {
private final List<StreamPlugin> streamPlugins; // Locking: this
@Inject
PluginManagerImpl(DatabaseComponent db, Executor executor, Poller poller,
PluginManagerImpl(DatabaseComponent db,
@PluginExecutor Executor pluginExecutor, Poller poller,
ConnectionDispatcher dispatcher, UiCallback uiCallback) {
this.db = db;
this.executor = executor;
this.pluginExecutor = pluginExecutor;
this.poller = poller;
this.dispatcher = dispatcher;
this.uiCallback = uiCallback;
@@ -83,11 +85,13 @@ class PluginManagerImpl implements PluginManager {
BatchPluginFactory factory =
(BatchPluginFactory) c.newInstance();
BatchCallback callback = new BatchCallback();
BatchPlugin plugin = factory.createPlugin(executor, callback);
BatchPlugin plugin = factory.createPlugin(pluginExecutor,
callback);
if(plugin == null) {
if(LOG.isLoggable(Level.INFO))
if(LOG.isLoggable(Level.INFO)) {
LOG.info(factory.getClass().getSimpleName()
+ " did not create a plugin");
}
continue;
}
TransportId id = plugin.getId();
@@ -121,11 +125,13 @@ class PluginManagerImpl implements PluginManager {
StreamPluginFactory factory =
(StreamPluginFactory) c.newInstance();
StreamCallback callback = new StreamCallback();
StreamPlugin plugin = factory.createPlugin(executor, callback);
StreamPlugin plugin = factory.createPlugin(pluginExecutor,
callback);
if(plugin == null) {
if(LOG.isLoggable(Level.INFO))
if(LOG.isLoggable(Level.INFO)) {
LOG.info(factory.getClass().getSimpleName()
+ " did not create a plugin");
}
continue;
}
TransportId id = plugin.getId();

View File

@@ -1,5 +1,9 @@
package net.sf.briar.plugins;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.PluginManager;
import com.google.inject.AbstractModule;
@@ -9,6 +13,8 @@ public class PluginsModule extends AbstractModule {
@Override
protected void configure() {
bind(Executor.class).annotatedWith(PluginExecutor.class).toInstance(
Executors.newCachedThreadPool());
bind(PluginManager.class).to(
PluginManagerImpl.class).in(Singleton.class);
bind(Poller.class).to(PollerImpl.class);

View File

@@ -19,6 +19,7 @@ import javax.microedition.io.StreamConnectionNotifier;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.StreamPlugin;
import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.protocol.TransportId;
@@ -44,9 +45,9 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
private LocalDevice localDevice = null; // Locking: this
private StreamConnectionNotifier socket = null; // Locking: this
BluetoothPlugin(Executor executor, StreamPluginCallback callback,
long pollingInterval) {
super(executor);
BluetoothPlugin(@PluginExecutor Executor pluginExecutor,
StreamPluginCallback callback, long pollingInterval) {
super(pluginExecutor);
this.callback = callback;
this.pollingInterval = pollingInterval;
}
@@ -69,7 +70,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
} catch(UnsatisfiedLinkError e) {
// On Linux the user may need to install libbluetooth-dev
if(OsUtils.isLinux()) {
executor.execute(new Runnable() {
pluginExecutor.execute(new Runnable() {
public void run() {
callback.showMessage("BLUETOOTH_INSTALL_LIBS");
}
@@ -77,7 +78,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
}
throw new IOException(e.getMessage());
}
executor.execute(createContactSocketBinder());
pluginExecutor.execute(createContactSocketBinder());
}
@Override
@@ -199,7 +200,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
public synchronized void poll() {
if(!started) return;
executor.execute(createConnectors());
pluginExecutor.execute(createConnectors());
}
private Runnable createConnectors() {

View File

@@ -2,16 +2,17 @@ package net.sf.briar.plugins.bluetooth;
import java.util.concurrent.Executor;
import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.StreamPlugin;
import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.plugins.StreamPluginFactory;
public class BluetoothPluginFactory implements StreamPluginFactory {
private static final long POLLING_INTERVAL = 3L * 60L * 1000L; // 3 mins
public StreamPlugin createPlugin(Executor executor,
public StreamPlugin createPlugin(@PluginExecutor Executor pluginExecutor,
StreamPluginCallback callback) {
return new BluetoothPlugin(executor, callback, POLLING_INTERVAL);
return new BluetoothPlugin(pluginExecutor, callback, POLLING_INTERVAL);
}
}

View File

@@ -13,6 +13,7 @@ import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.plugins.BatchPlugin;
import net.sf.briar.api.plugins.BatchPluginCallback;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.transport.BatchTransportReader;
import net.sf.briar.api.transport.BatchTransportWriter;
import net.sf.briar.api.transport.TransportConstants;
@@ -35,8 +36,9 @@ abstract class FilePlugin extends AbstractPlugin implements BatchPlugin {
protected abstract void writerFinished(File f);
protected abstract void readerFinished(File f);
protected FilePlugin(Executor executor, BatchPluginCallback callback) {
super(executor);
protected FilePlugin(@PluginExecutor Executor pluginExecutor,
BatchPluginCallback callback) {
super(pluginExecutor);
this.callback = callback;
}
@@ -85,7 +87,7 @@ abstract class FilePlugin extends AbstractPlugin implements BatchPlugin {
protected synchronized void createReaderFromFile(final File f) {
if(!started) return;
executor.execute(new ReaderCreator(f));
pluginExecutor.execute(new ReaderCreator(f));
}
public BatchTransportWriter sendInvitation(int code, long timeout) {

View File

@@ -11,6 +11,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.briar.api.plugins.BatchPluginCallback;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.util.StringUtils;
@@ -28,9 +29,10 @@ implements RemovableDriveMonitor.Callback {
private final RemovableDriveFinder finder;
private final RemovableDriveMonitor monitor;
RemovableDrivePlugin(Executor executor, BatchPluginCallback callback,
RemovableDriveFinder finder, RemovableDriveMonitor monitor) {
super(executor, callback);
RemovableDrivePlugin(@PluginExecutor Executor pluginExecutor,
BatchPluginCallback callback, RemovableDriveFinder finder,
RemovableDriveMonitor monitor) {
super(pluginExecutor, callback);
this.finder = finder;
this.monitor = monitor;
}

View File

@@ -2,16 +2,17 @@ package net.sf.briar.plugins.file;
import java.util.concurrent.Executor;
import net.sf.briar.api.plugins.BatchPluginCallback;
import net.sf.briar.api.plugins.BatchPlugin;
import net.sf.briar.api.plugins.BatchPluginCallback;
import net.sf.briar.api.plugins.BatchPluginFactory;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.util.OsUtils;
public class RemovableDrivePluginFactory implements BatchPluginFactory {
private static final long POLLING_INTERVAL = 10L * 1000L; // 10 seconds
public BatchPlugin createPlugin(Executor executor,
public BatchPlugin createPlugin(@PluginExecutor Executor pluginExecutor,
BatchPluginCallback callback) {
RemovableDriveFinder finder;
RemovableDriveMonitor monitor;
@@ -33,6 +34,7 @@ public class RemovableDrivePluginFactory implements BatchPluginFactory {
} else {
return null;
}
return new RemovableDrivePlugin(executor, callback, finder, monitor);
return new RemovableDrivePlugin(pluginExecutor, callback, finder,
monitor);
}
}

View File

@@ -14,6 +14,7 @@ import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.transport.StreamTransportConnection;
import net.sf.briar.util.ByteUtils;
@@ -24,9 +25,9 @@ class LanSocketPlugin extends SimpleSocketPlugin {
private static final Logger LOG =
Logger.getLogger(LanSocketPlugin.class.getName());
LanSocketPlugin(Executor executor, StreamPluginCallback callback,
long pollingInterval) {
super(executor, callback, pollingInterval);
LanSocketPlugin(@PluginExecutor Executor pluginExecutor,
StreamPluginCallback callback, long pollingInterval) {
super(pluginExecutor, callback, pollingInterval);
}
@Override

View File

@@ -15,6 +15,7 @@ import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.transport.StreamTransportConnection;
@@ -32,9 +33,9 @@ class SimpleSocketPlugin extends SocketPlugin {
private final long pollingInterval;
SimpleSocketPlugin(Executor executor, StreamPluginCallback callback,
long pollingInterval) {
super(executor, callback);
SimpleSocketPlugin(@PluginExecutor Executor pluginExecutor,
StreamPluginCallback callback, long pollingInterval) {
super(pluginExecutor, callback);
this.pollingInterval = pollingInterval;
}

View File

@@ -2,16 +2,18 @@ package net.sf.briar.plugins.socket;
import java.util.concurrent.Executor;
import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.StreamPlugin;
import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.plugins.StreamPluginFactory;
public class SimpleSocketPluginFactory implements StreamPluginFactory {
private static final long POLLING_INTERVAL = 5L * 60L * 1000L; // 5 mins
public StreamPlugin createPlugin(Executor executor,
public StreamPlugin createPlugin(@PluginExecutor Executor pluginExecutor,
StreamPluginCallback callback) {
return new SimpleSocketPlugin(executor, callback, POLLING_INTERVAL);
return new SimpleSocketPlugin(pluginExecutor, callback,
POLLING_INTERVAL);
}
}

View File

@@ -9,8 +9,9 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.StreamPlugin;
import net.sf.briar.api.plugins.StreamPluginCallback;
import net.sf.briar.api.transport.StreamTransportConnection;
import net.sf.briar.plugins.AbstractPlugin;
@@ -32,15 +33,16 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin {
protected abstract SocketAddress getLocalSocketAddress();
protected abstract SocketAddress getRemoteSocketAddress(ContactId c);
protected SocketPlugin(Executor executor, StreamPluginCallback callback) {
super(executor);
protected SocketPlugin(@PluginExecutor Executor pluginExecutor,
StreamPluginCallback callback) {
super(pluginExecutor);
this.callback = callback;
}
@Override
public synchronized void start() throws IOException {
super.start();
executor.execute(createBinder());
pluginExecutor.execute(createBinder());
}
private Runnable createBinder() {
@@ -140,7 +142,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin {
if(!shouldPoll()) throw new UnsupportedOperationException();
if(!started) return;
for(ContactId c : callback.getRemoteProperties().keySet()) {
executor.execute(createConnector(c));
pluginExecutor.execute(createConnector(c));
}
}

View File

@@ -2,6 +2,7 @@ package net.sf.briar.protocol.batch;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.logging.Level;
@@ -57,7 +58,7 @@ class OutgoingBatchConnection {
ProtocolWriter writer = protoFactory.createProtocolWriter(out);
// There should be enough space for a packet
long capacity = conn.getRemainingCapacity();
if(capacity < MAX_PACKET_LENGTH) throw new IOException();
if(capacity < MAX_PACKET_LENGTH) throw new EOFException();
// Write a transport update
TransportUpdate t = db.generateTransportUpdate(contactId);
if(t != null) writer.writeTransportUpdate(t);

View File

@@ -33,6 +33,7 @@ import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportIndex;
import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionRecogniser;
import net.sf.briar.api.transport.ConnectionRecogniserExecutor;
import net.sf.briar.api.transport.ConnectionWindow;
import net.sf.briar.util.ByteUtils;
@@ -44,9 +45,9 @@ DatabaseListener {
private static final Logger LOG =
Logger.getLogger(ConnectionRecogniserImpl.class.getName());
private final CryptoComponent crypto;
private final DatabaseComponent db;
private final Executor executor;
private final DatabaseComponent db;
private final CryptoComponent crypto;
private final Cipher tagCipher; // Locking: this
private final Set<TransportId> localTransportIds; // Locking: this
private final Map<Bytes, Context> expected; // Locking: this
@@ -54,11 +55,11 @@ DatabaseListener {
private boolean initialised = false; // Locking: this
@Inject
ConnectionRecogniserImpl(CryptoComponent crypto, DatabaseComponent db,
Executor executor) {
this.crypto = crypto;
this.db = db;
ConnectionRecogniserImpl(@ConnectionRecogniserExecutor Executor executor,
DatabaseComponent db, CryptoComponent crypto) {
this.executor = executor;
this.db = db;
this.crypto = crypto;
tagCipher = crypto.getTagCipher();
localTransportIds = new HashSet<TransportId>();
expected = new HashMap<Bytes, Context>();

View File

@@ -1,13 +1,18 @@
package net.sf.briar.transport;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import net.sf.briar.api.transport.ConnectionContextFactory;
import net.sf.briar.api.transport.ConnectionDispatcher;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionRecogniser;
import net.sf.briar.api.transport.ConnectionRecogniserExecutor;
import net.sf.briar.api.transport.ConnectionWindowFactory;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
public class TransportModule extends AbstractModule {
@@ -18,10 +23,14 @@ public class TransportModule extends AbstractModule {
bind(ConnectionDispatcher.class).to(ConnectionDispatcherImpl.class);
bind(ConnectionReaderFactory.class).to(
ConnectionReaderFactoryImpl.class);
bind(ConnectionRecogniser.class).to(ConnectionRecogniserImpl.class);
bind(ConnectionRecogniser.class).to(ConnectionRecogniserImpl.class).in(
Singleton.class);
bind(ConnectionWindowFactory.class).to(
ConnectionWindowFactoryImpl.class);
bind(ConnectionWriterFactory.class).to(
ConnectionWriterFactoryImpl.class);
bind(Executor.class).annotatedWith(
ConnectionRecogniserExecutor.class).toInstance(
Executors.newCachedThreadPool());
}
}