Merge branch 'start-plugins-async' into 'master'

Start plugins asynchronously

This prevents other services from getting stuck behind the plugin manager while the Tor plugin is starting, which takes several seconds. The plugin manager waits for each plugin to start before stopping it.

I've also added some canaries to plugins and services to ensure instances aren't started more than once.

See merge request !181
This commit is contained in:
akwizgran
2016-05-11 14:46:55 +00:00
16 changed files with 252 additions and 90 deletions

View File

@@ -53,20 +53,20 @@ class LifecycleManagerImpl implements LifecycleManager {
@Override
public void registerService(Service s) {
if (LOG.isLoggable(INFO))
LOG.info("Registering service " + s.getClass().getName());
LOG.info("Registering service " + s.getClass().getSimpleName());
services.add(s);
}
@Override
public void registerClient(Client c) {
if (LOG.isLoggable(INFO))
LOG.info("Registering client " + c.getClass().getName());
LOG.info("Registering client " + c.getClass().getSimpleName());
clients.add(c);
}
@Override
public void registerForShutdown(ExecutorService e) {
LOG.info("Registering executor");
LOG.info("Registering executor " + e.getClass().getSimpleName());
executors.add(e);
}
@@ -94,7 +94,8 @@ class LifecycleManagerImpl implements LifecycleManager {
c.createLocalState(txn);
duration = System.currentTimeMillis() - start;
if (LOG.isLoggable(INFO)) {
LOG.info("Starting client " + c.getClass().getName()
LOG.info("Starting client "
+ c.getClass().getSimpleName()
+ " took " + duration + " ms");
}
}
@@ -107,7 +108,7 @@ class LifecycleManagerImpl implements LifecycleManager {
s.startService();
duration = System.currentTimeMillis() - start;
if (LOG.isLoggable(INFO)) {
LOG.info("Starting service " + s.getClass().getName()
LOG.info("Starting service " + s.getClass().getSimpleName()
+ " took " + duration + " ms");
}
}
@@ -140,13 +141,17 @@ class LifecycleManagerImpl implements LifecycleManager {
s.stopService();
long duration = System.currentTimeMillis() - start;
if (LOG.isLoggable(INFO)) {
LOG.info("Stopping service " + s.getClass().getName()
LOG.info("Stopping service " + s.getClass().getSimpleName()
+ " took " + duration + " ms");
}
}
for (ExecutorService e : executors) e.shutdownNow();
if (LOG.isLoggable(INFO))
LOG.info(executors.size() + " executors shut down");
for (ExecutorService e : executors) {
if (LOG.isLoggable(INFO)) {
LOG.info("Stopping executor "
+ e.getClass().getSimpleName());
}
e.shutdownNow();
}
long start = System.currentTimeMillis();
db.close();
long duration = System.currentTimeMillis() - start;

View File

@@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -61,6 +62,8 @@ class PluginManagerImpl implements PluginManager, Service {
private final Map<TransportId, Plugin> plugins;
private final List<SimplexPlugin> simplexPlugins;
private final List<DuplexPlugin> duplexPlugins;
private final Map<TransportId, CountDownLatch> startLatches;
private final AtomicBoolean used = new AtomicBoolean(false);
@Inject
PluginManagerImpl(@IoExecutor Executor ioExecutor, EventBus eventBus,
@@ -78,68 +81,64 @@ class PluginManagerImpl implements PluginManager, Service {
plugins = new ConcurrentHashMap<TransportId, Plugin>();
simplexPlugins = new CopyOnWriteArrayList<SimplexPlugin>();
duplexPlugins = new CopyOnWriteArrayList<DuplexPlugin>();
startLatches = new ConcurrentHashMap<TransportId, CountDownLatch>();
}
@Override
public void startService() throws ServiceException {
Collection<SimplexPluginFactory> simplexFactories =
pluginConfig.getSimplexFactories();
Collection<DuplexPluginFactory> duplexFactories =
pluginConfig.getDuplexFactories();
int numPlugins = simplexFactories.size() + duplexFactories.size();
CountDownLatch latch = new CountDownLatch(numPlugins);
// Instantiate and start the simplex plugins
if (used.getAndSet(true)) throw new IllegalStateException();
// Instantiate the simplex plugins and start them asynchronously
LOG.info("Starting simplex plugins");
for (SimplexPluginFactory f : simplexFactories) {
for (SimplexPluginFactory f : pluginConfig.getSimplexFactories()) {
TransportId t = f.getId();
SimplexPlugin s = f.createPlugin(new SimplexCallback(t));
if (s == null) {
if (LOG.isLoggable(WARNING))
LOG.warning("Could not create plugin for " + t);
latch.countDown();
} else {
plugins.put(t, s);
simplexPlugins.add(s);
ioExecutor.execute(new PluginStarter(s, latch));
CountDownLatch startLatch = new CountDownLatch(1);
startLatches.put(t, startLatch);
ioExecutor.execute(new PluginStarter(s, startLatch));
}
}
// Instantiate and start the duplex plugins
// Instantiate the duplex plugins and start them asynchronously
LOG.info("Starting duplex plugins");
for (DuplexPluginFactory f : duplexFactories) {
for (DuplexPluginFactory f : pluginConfig.getDuplexFactories()) {
TransportId t = f.getId();
DuplexPlugin d = f.createPlugin(new DuplexCallback(t));
if (d == null) {
if (LOG.isLoggable(WARNING))
LOG.warning("Could not create plugin for " + t);
latch.countDown();
} else {
plugins.put(t, d);
duplexPlugins.add(d);
ioExecutor.execute(new PluginStarter(d, latch));
CountDownLatch startLatch = new CountDownLatch(1);
startLatches.put(t, startLatch);
ioExecutor.execute(new PluginStarter(d, startLatch));
}
}
// Wait for all the plugins to start
try {
latch.await();
} catch (InterruptedException e) {
throw new ServiceException(e);
}
}
@Override
public void stopService() throws ServiceException {
CountDownLatch latch = new CountDownLatch(plugins.size());
CountDownLatch stopLatch = new CountDownLatch(plugins.size());
// Stop the simplex plugins
LOG.info("Stopping simplex plugins");
for (SimplexPlugin plugin : simplexPlugins)
ioExecutor.execute(new PluginStopper(plugin, latch));
for (SimplexPlugin s : simplexPlugins) {
CountDownLatch startLatch = startLatches.get(s.getId());
ioExecutor.execute(new PluginStopper(s, startLatch, stopLatch));
}
// Stop the duplex plugins
LOG.info("Stopping duplex plugins");
for (DuplexPlugin plugin : duplexPlugins)
ioExecutor.execute(new PluginStopper(plugin, latch));
for (DuplexPlugin d : duplexPlugins) {
CountDownLatch startLatch = startLatches.get(d.getId());
ioExecutor.execute(new PluginStopper(d, startLatch, stopLatch));
}
// Wait for all the plugins to stop
try {
latch.await();
stopLatch.await();
} catch (InterruptedException e) {
throw new ServiceException(e);
}
@@ -179,11 +178,11 @@ class PluginManagerImpl implements PluginManager, Service {
private class PluginStarter implements Runnable {
private final Plugin plugin;
private final CountDownLatch latch;
private final CountDownLatch startLatch;
private PluginStarter(Plugin plugin, CountDownLatch latch) {
private PluginStarter(Plugin plugin, CountDownLatch startLatch) {
this.plugin = plugin;
this.latch = latch;
this.startLatch = startLatch;
}
@Override
@@ -209,7 +208,7 @@ class PluginManagerImpl implements PluginManager, Service {
LOG.log(WARNING, e.toString(), e);
}
} finally {
latch.countDown();
startLatch.countDown();
}
}
}
@@ -217,16 +216,21 @@ class PluginManagerImpl implements PluginManager, Service {
private class PluginStopper implements Runnable {
private final Plugin plugin;
private final CountDownLatch latch;
private final CountDownLatch startLatch, stopLatch;
private PluginStopper(Plugin plugin, CountDownLatch latch) {
private PluginStopper(Plugin plugin, CountDownLatch startLatch,
CountDownLatch stopLatch) {
this.plugin = plugin;
this.latch = latch;
this.startLatch = startLatch;
this.stopLatch = stopLatch;
}
@Override
public void run() {
try {
// Wait for the plugin to finish starting
startLatch.await();
// Stop the plugin
long start = System.currentTimeMillis();
plugin.stop();
long duration = System.currentTimeMillis() - start;
@@ -234,10 +238,13 @@ class PluginManagerImpl implements PluginManager, Service {
LOG.info("Stopping plugin " + plugin.getId()
+ " took " + duration + " ms");
}
} catch (InterruptedException e) {
LOG.warning("Interrupted while waiting for plugin to start");
// This task runs on an executor, so don't reset the interrupt
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
latch.countDown();
stopLatch.countDown();
}
}
}

View File

@@ -14,6 +14,7 @@ import java.io.OutputStream;
import java.util.Collection;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
@@ -27,6 +28,7 @@ public abstract class FilePlugin implements SimplexPlugin {
protected final Executor ioExecutor;
protected final SimplexPluginCallback callback;
protected final int maxLatency;
protected final AtomicBoolean used = new AtomicBoolean(false);
protected volatile boolean running = false;
@@ -42,22 +44,27 @@ public abstract class FilePlugin implements SimplexPlugin {
this.maxLatency = maxLatency;
}
@Override
public int getMaxLatency() {
return maxLatency;
}
@Override
public int getMaxIdleTime() {
return Integer.MAX_VALUE; // We don't need keepalives
}
@Override
public boolean isRunning() {
return running;
}
@Override
public TransportConnectionReader createReader(ContactId c) {
return null;
}
@Override
public TransportConnectionWriter createWriter(ContactId c) {
if (!running) return null;
return createWriter(createConnectionFilename());
@@ -105,6 +112,7 @@ public abstract class FilePlugin implements SimplexPlugin {
this.file = file;
}
@Override
public void run() {
if (isPossibleConnectionFilename(file.getName())) {
try {

View File

@@ -31,6 +31,7 @@ class LanTcpPlugin extends TcpPlugin {
super(ioExecutor, backoff, callback, maxLatency, maxIdleTime);
}
@Override
public TransportId getId() {
return ID;
}

View File

@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.regex.Pattern;
@@ -41,6 +42,7 @@ abstract class TcpPlugin implements DuplexPlugin {
protected final Backoff backoff;
protected final DuplexPluginCallback callback;
protected final int maxLatency, maxIdleTime, socketTimeout;
protected final AtomicBoolean used = new AtomicBoolean(false);
protected volatile boolean running = false;
protected volatile ServerSocket socket = null;
@@ -81,15 +83,19 @@ abstract class TcpPlugin implements DuplexPlugin {
else socketTimeout = maxIdleTime * 2;
}
@Override
public int getMaxLatency() {
return maxLatency;
}
@Override
public int getMaxIdleTime() {
return maxIdleTime;
}
@Override
public boolean start() {
if (used.getAndSet(true)) throw new IllegalStateException();
running = true;
bind();
return true;
@@ -97,6 +103,7 @@ abstract class TcpPlugin implements DuplexPlugin {
protected void bind() {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
if (!running) return;
ServerSocket ss = null;
@@ -166,23 +173,28 @@ abstract class TcpPlugin implements DuplexPlugin {
}
}
@Override
public void stop() {
running = false;
tryToClose(socket);
}
@Override
public boolean isRunning() {
return running && socket != null && !socket.isClosed();
}
@Override
public boolean shouldPoll() {
return true;
}
@Override
public int getPollingInterval() {
return backoff.getPollingInterval();
}
@Override
public void poll(Collection<ContactId> connected) {
if (!isRunning()) return;
backoff.increment();
@@ -193,6 +205,7 @@ abstract class TcpPlugin implements DuplexPlugin {
private void connectAndCallBack(final ContactId c) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
DuplexTransportConnection d = createConnection(c);
if (d != null) {
@@ -203,6 +216,7 @@ abstract class TcpPlugin implements DuplexPlugin {
});
}
@Override
public DuplexTransportConnection createConnection(ContactId c) {
if (!isRunning()) return null;
for (InetSocketAddress remote : getRemoteSocketAddresses(c)) {
@@ -250,24 +264,28 @@ abstract class TcpPlugin implements DuplexPlugin {
}
}
@Override
public boolean supportsInvitations() {
return false;
}
@Override
public DuplexTransportConnection createInvitationConnection(PseudoRandom r,
long timeout, boolean alice) {
throw new UnsupportedOperationException();
}
@Override
public boolean supportsKeyAgreement() {
return false;
}
public KeyAgreementListener createKeyAgreementListener(
byte[] commitment) {
@Override
public KeyAgreementListener createKeyAgreementListener(byte[] commitment) {
throw new UnsupportedOperationException();
}
@Override
public DuplexTransportConnection createKeyAgreementConnection(
byte[] commitment, TransportDescriptor d, long timeout) {
throw new UnsupportedOperationException();

View File

@@ -31,6 +31,7 @@ class WanTcpPlugin extends TcpPlugin {
this.portMapper = portMapper;
}
@Override
public TransportId getId() {
return ID;
}

View File

@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -44,6 +45,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
private final Executor cryptoExecutor;
private final Map<ClientId, MessageValidator> validators;
private final Map<ClientId, IncomingMessageHook> hooks;
private final AtomicBoolean used = new AtomicBoolean(false);
@Inject
ValidationManagerImpl(DatabaseComponent db,
@@ -58,6 +60,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
@Override
public void startService() {
if (used.getAndSet(true)) throw new IllegalStateException();
for (ClientId c : validators.keySet()) getMessagesToValidate(c);
}
@@ -78,6 +81,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
private void getMessagesToValidate(final ClientId c) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Queue<MessageId> unvalidated = new LinkedList<MessageId>();
@@ -100,6 +104,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
private void validateNextMessage(final Queue<MessageId> unvalidated) {
if (unvalidated.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Message m = null;
@@ -141,6 +146,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
private void validateMessage(final Message m, final Group g) {
cryptoExecutor.execute(new Runnable() {
@Override
public void run() {
MessageValidator v = validators.get(g.getClientId());
if (v == null) {
@@ -156,6 +162,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
private void storeValidationResult(final Message m, final ClientId c,
final Metadata meta) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Transaction txn = db.startTransaction(false);
@@ -193,6 +200,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
private void loadGroupAndValidate(final Message m) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Group g;

View File

@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -47,6 +48,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
private final Clock clock;
private final Map<ContactId, Boolean> activeContacts;
private final ConcurrentHashMap<TransportId, TransportKeyManager> managers;
private final AtomicBoolean used = new AtomicBoolean(false);
@Inject
KeyManagerImpl(DatabaseComponent db, CryptoComponent crypto,
@@ -66,6 +68,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
@Override
public void startService() throws ServiceException {
if (used.getAndSet(true)) throw new IllegalStateException();
Map<TransportId, Integer> transports =
new HashMap<TransportId, Integer>();
for (SimplexPluginFactory f : pluginConfig.getSimplexFactories())