Merged IncomingConnectionExecutor and PluginExecutor into IoExecutor.

We don't need two separate executors for long-running IO threads.
This commit is contained in:
akwizgran
2014-10-02 18:02:53 +01:00
parent 458c0ca285
commit 941efb4bbe
35 changed files with 172 additions and 223 deletions

View File

@@ -1,18 +1,49 @@
package org.briarproject.lifecycle;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Singleton;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.lifecycle.ShutdownManager;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class LifecycleModule extends AbstractModule {
private final ExecutorService ioExecutor;
public LifecycleModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
ioExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
}
@Override
protected void configure() {
bind(LifecycleManager.class).to(
LifecycleManagerImpl.class).in(Singleton.class);
bind(ShutdownManager.class).to(
ShutdownManagerImpl.class).in(Singleton.class);
}
@Provides @Singleton @IoExecutor
Executor getIoExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(ioExecutor);
return ioExecutor;
}
}

View File

@@ -23,9 +23,9 @@ import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginCallback;
import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
@@ -49,7 +49,7 @@ class PluginManagerImpl implements PluginManager {
private static final Logger LOG =
Logger.getLogger(PluginManagerImpl.class.getName());
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final SimplexPluginConfig simplexPluginConfig;
private final DuplexPluginConfig duplexPluginConfig;
private final Clock clock;
@@ -62,12 +62,12 @@ class PluginManagerImpl implements PluginManager {
private final List<DuplexPlugin> duplexPlugins;
@Inject
PluginManagerImpl(@PluginExecutor Executor pluginExecutor,
PluginManagerImpl(@IoExecutor Executor ioExecutor,
SimplexPluginConfig simplexPluginConfig,
DuplexPluginConfig duplexPluginConfig, Clock clock,
DatabaseComponent db, Poller poller,
ConnectionDispatcher dispatcher, UiCallback uiCallback) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.simplexPluginConfig = simplexPluginConfig;
this.duplexPluginConfig = duplexPluginConfig;
this.clock = clock;
@@ -87,14 +87,14 @@ class PluginManagerImpl implements PluginManager {
simplexPluginConfig.getFactories();
final CountDownLatch sLatch = new CountDownLatch(sFactories.size());
for(SimplexPluginFactory factory : sFactories)
pluginExecutor.execute(new SimplexPluginStarter(factory, sLatch));
ioExecutor.execute(new SimplexPluginStarter(factory, sLatch));
// Instantiate and start the duplex plugins
LOG.info("Starting duplex plugins");
Collection<DuplexPluginFactory> dFactories =
duplexPluginConfig.getFactories();
final CountDownLatch dLatch = new CountDownLatch(dFactories.size());
for(DuplexPluginFactory factory : dFactories)
pluginExecutor.execute(new DuplexPluginStarter(factory, dLatch));
ioExecutor.execute(new DuplexPluginStarter(factory, dLatch));
// Wait for the plugins to start
try {
sLatch.await();
@@ -119,11 +119,11 @@ class PluginManagerImpl implements PluginManager {
// Stop the simplex plugins
LOG.info("Stopping simplex plugins");
for(SimplexPlugin plugin : simplexPlugins)
pluginExecutor.execute(new PluginStopper(plugin, latch));
ioExecutor.execute(new PluginStopper(plugin, latch));
// Stop the duplex plugins
LOG.info("Stopping duplex plugins");
for(DuplexPlugin plugin : duplexPlugins)
pluginExecutor.execute(new PluginStopper(plugin, latch));
ioExecutor.execute(new PluginStopper(plugin, latch));
plugins.clear();
simplexPlugins.clear();
duplexPlugins.clear();

View File

@@ -1,18 +1,8 @@
package org.briarproject.plugins;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Singleton;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.plugins.PluginManager;
import com.google.inject.AbstractModule;
@@ -20,19 +10,7 @@ import com.google.inject.Provides;
public class PluginsModule extends AbstractModule {
private final ExecutorService pluginExecutor;
public PluginsModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
pluginExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, SECONDS, queue, policy);
}
@Override
protected void configure() {
bind(Poller.class).to(PollerImpl.class);
}
@@ -43,10 +21,4 @@ public class PluginsModule extends AbstractModule {
lifecycleManager.register(pluginManager);
return pluginManager;
}
@Provides @Singleton @PluginExecutor
Executor getPluginExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(pluginExecutor);
return pluginExecutor;
}
}

View File

@@ -9,8 +9,8 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.ConnectionRegistry;
@@ -19,14 +19,14 @@ class PollerImpl implements Poller {
private static final Logger LOG =
Logger.getLogger(PollerImpl.class.getName());
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final ConnectionRegistry connRegistry;
private final Timer timer;
@Inject
PollerImpl(@PluginExecutor Executor pluginExecutor,
ConnectionRegistry connRegistry, Timer timer) {
this.pluginExecutor = pluginExecutor;
PollerImpl(@IoExecutor Executor ioExecutor, ConnectionRegistry connRegistry,
Timer timer) {
this.ioExecutor = ioExecutor;
this.connRegistry = connRegistry;
this.timer = timer;
}
@@ -49,7 +49,7 @@ class PollerImpl implements Poller {
}
public void pollNow(final Plugin p) {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
if(LOG.isLoggable(INFO))
LOG.info("Polling " + p.getClass().getSimpleName());
@@ -66,6 +66,7 @@ class PollerImpl implements Poller {
this.plugin = plugin;
}
@Override
public void run() {
pollNow(plugin);
schedule(plugin, false);

View File

@@ -25,7 +25,7 @@ public abstract class FilePlugin implements SimplexPlugin {
private static final Logger LOG =
Logger.getLogger(FilePlugin.class.getName());
protected final Executor pluginExecutor;
protected final Executor ioExecutor;
protected final FileUtils fileUtils;
protected final SimplexPluginCallback callback;
protected final int maxFrameLength;
@@ -38,10 +38,10 @@ public abstract class FilePlugin implements SimplexPlugin {
protected abstract void writerFinished(File f);
protected abstract void readerFinished(File f);
protected FilePlugin(Executor pluginExecutor, FileUtils fileUtils,
protected FilePlugin(Executor ioExecutor, FileUtils fileUtils,
SimplexPluginCallback callback, int maxFrameLength,
long maxLatency) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.fileUtils = fileUtils;
this.callback = callback;
this.maxFrameLength = maxFrameLength;
@@ -100,7 +100,7 @@ public abstract class FilePlugin implements SimplexPlugin {
protected void createReaderFromFile(final File f) {
if(!running) return;
pluginExecutor.execute(new ReaderCreator(f));
ioExecutor.execute(new ReaderCreator(f));
}
private class ReaderCreator implements Runnable {

View File

@@ -16,9 +16,9 @@ class LanTcpPlugin extends TcpPlugin {
static final TransportId ID = new TransportId("lan");
LanTcpPlugin(Executor pluginExecutor, DuplexPluginCallback callback,
LanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) {
super(pluginExecutor, callback, maxFrameLength, maxLatency,
super(ioExecutor, callback, maxFrameLength, maxLatency,
pollingInterval);
}

View File

@@ -13,10 +13,10 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute
private final Executor pluginExecutor;
private final Executor ioExecutor;
public LanTcpPluginFactory(Executor pluginExecutor) {
this.pluginExecutor = pluginExecutor;
public LanTcpPluginFactory(Executor ioExecutor) {
this.ioExecutor = ioExecutor;
}
public TransportId getId() {
@@ -24,7 +24,7 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new LanTcpPlugin(pluginExecutor, callback, MAX_FRAME_LENGTH,
return new LanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, POLLING_INTERVAL);
}
}

View File

@@ -35,7 +35,7 @@ abstract class TcpPlugin implements DuplexPlugin {
private static final Logger LOG =
Logger.getLogger(TcpPlugin.class.getName());
protected final Executor pluginExecutor;
protected final Executor ioExecutor;
protected final DuplexPluginCallback callback;
protected final int maxFrameLength;
protected final long maxLatency, pollingInterval;
@@ -52,9 +52,9 @@ abstract class TcpPlugin implements DuplexPlugin {
/** Returns true if connections to the given address can be attempted. */
protected abstract boolean isConnectable(InetSocketAddress remote);
protected TcpPlugin(Executor pluginExecutor, DuplexPluginCallback callback,
protected TcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.callback = callback;
this.maxFrameLength = maxFrameLength;
this.maxLatency = maxLatency;
@@ -76,7 +76,7 @@ abstract class TcpPlugin implements DuplexPlugin {
}
protected void bind() {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
if(!running) return;
ServerSocket ss = null;
@@ -172,7 +172,7 @@ abstract class TcpPlugin implements DuplexPlugin {
}
private void connectAndCallBack(final ContactId c) {
pluginExecutor.execute(new Runnable() {
ioExecutor.execute(new Runnable() {
public void run() {
DuplexTransportConnection d = createConnection(c);
if(d != null) callback.outgoingConnectionCreated(c, d);

View File

@@ -20,10 +20,10 @@ class WanTcpPlugin extends TcpPlugin {
private volatile MappingResult mappingResult;
WanTcpPlugin(Executor pluginExecutor, DuplexPluginCallback callback,
WanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval,
PortMapper portMapper) {
super(pluginExecutor, callback, maxFrameLength, maxLatency,
super(ioExecutor, callback, maxFrameLength, maxLatency,
pollingInterval);
this.portMapper = portMapper;
}

View File

@@ -14,12 +14,12 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 5 * 60 * 1000; // 5 minutes
private final Executor pluginExecutor;
private final Executor ioExecutor;
private final ShutdownManager shutdownManager;
public WanTcpPluginFactory(Executor pluginExecutor,
public WanTcpPluginFactory(Executor ioExecutor,
ShutdownManager shutdownManager) {
this.pluginExecutor = pluginExecutor;
this.ioExecutor = ioExecutor;
this.shutdownManager = shutdownManager;
}
@@ -28,7 +28,7 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new WanTcpPlugin(pluginExecutor, callback, MAX_FRAME_LENGTH,
return new WanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, POLLING_INTERVAL,
new PortMapperImpl(shutdownManager));
}

View File

@@ -14,6 +14,7 @@ import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DbException;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.messaging.duplex.DuplexConnectionFactory;
import org.briarproject.api.messaging.simplex.SimplexConnectionFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
@@ -22,31 +23,30 @@ import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionDispatcher;
import org.briarproject.api.transport.ConnectionRecogniser;
import org.briarproject.api.transport.IncomingConnectionExecutor;
class ConnectionDispatcherImpl implements ConnectionDispatcher {
private static final Logger LOG =
Logger.getLogger(ConnectionDispatcherImpl.class.getName());
private final Executor connExecutor;
private final Executor ioExecutor;
private final ConnectionRecogniser recogniser;
private final SimplexConnectionFactory simplexConnFactory;
private final DuplexConnectionFactory duplexConnFactory;
@Inject
ConnectionDispatcherImpl(@IncomingConnectionExecutor Executor connExecutor,
ConnectionDispatcherImpl(@IoExecutor Executor ioExecutor,
ConnectionRecogniser recogniser,
SimplexConnectionFactory simplexConnFactory,
DuplexConnectionFactory duplexConnFactory) {
this.connExecutor = connExecutor;
this.ioExecutor = ioExecutor;
this.recogniser = recogniser;
this.simplexConnFactory = simplexConnFactory;
this.duplexConnFactory = duplexConnFactory;
}
public void dispatchReader(TransportId t, SimplexTransportReader r) {
connExecutor.execute(new DispatchSimplexConnection(t, r));
ioExecutor.execute(new DispatchSimplexConnection(t, r));
}
public void dispatchWriter(ContactId c, TransportId t,
@@ -56,7 +56,7 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
public void dispatchIncomingConnection(TransportId t,
DuplexTransportConnection d) {
connExecutor.execute(new DispatchDuplexConnection(t, d));
ioExecutor.execute(new DispatchDuplexConnection(t, d));
}
public void dispatchOutgoingConnection(ContactId c, TransportId t,

View File

@@ -1,14 +1,5 @@
package org.briarproject.transport;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Singleton;
import org.briarproject.api.crypto.KeyManager;
@@ -18,26 +9,13 @@ import org.briarproject.api.transport.ConnectionReaderFactory;
import org.briarproject.api.transport.ConnectionRecogniser;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.ConnectionWriterFactory;
import org.briarproject.api.transport.IncomingConnectionExecutor;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class TransportModule extends AbstractModule {
private final ExecutorService incomingConnectionExecutor;
public TransportModule() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
// Create threads as required and keep them in the pool for 60 seconds
incomingConnectionExecutor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 60, SECONDS, queue, policy);
}
@Override
protected void configure() {
bind(ConnectionDispatcher.class).to(ConnectionDispatcherImpl.class);
bind(ConnectionReaderFactory.class).to(
@@ -55,10 +33,4 @@ public class TransportModule extends AbstractModule {
lifecycleManager.register(keyManager);
return keyManager;
}
@Provides @Singleton @IncomingConnectionExecutor
Executor getIncomingConnectionExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(incomingConnectionExecutor);
return incomingConnectionExecutor;
}
}