Start and stop plugins in parallel for faster startup and shutdown.

This commit is contained in:
akwizgran
2013-04-25 19:34:46 +01:00
parent ee641db600
commit 80ac368cab

View File

@@ -7,11 +7,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
@@ -54,8 +55,8 @@ class PluginManagerImpl implements PluginManager {
private final Poller poller;
private final ConnectionDispatcher dispatcher;
private final UiCallback uiCallback;
private final List<SimplexPlugin> simplexPlugins; // Locking: this
private final List<DuplexPlugin> duplexPlugins; // Locking: this
private final List<SimplexPlugin> simplexPlugins;
private final List<DuplexPlugin> duplexPlugins;
@Inject
PluginManagerImpl(@PluginExecutor ExecutorService pluginExecutor,
@@ -72,83 +73,36 @@ class PluginManagerImpl implements PluginManager {
this.poller = poller;
this.dispatcher = dispatcher;
this.uiCallback = uiCallback;
simplexPlugins = new ArrayList<SimplexPlugin>();
duplexPlugins = new ArrayList<DuplexPlugin>();
simplexPlugins = new CopyOnWriteArrayList<SimplexPlugin>();
duplexPlugins = new CopyOnWriteArrayList<DuplexPlugin>();
}
public synchronized int start() {
Set<TransportId> ids = new HashSet<TransportId>();
// Instantiate and start the simplex plugins
if(LOG.isLoggable(INFO)) LOG.info("Starting simplex plugins");
for(SimplexPluginFactory factory : simplexPluginConfig.getFactories()) {
TransportId id = factory.getId();
if(!ids.add(id)) {
if(LOG.isLoggable(WARNING))
LOG.warning("Duplicate transport ID: " + id);
continue;
}
SimplexCallback callback = new SimplexCallback(id);
SimplexPlugin plugin = factory.createPlugin(callback);
if(plugin == null) {
if(LOG.isLoggable(INFO)) {
LOG.info(factory.getClass().getSimpleName()
+ " did not create a plugin");
}
continue;
}
try {
db.addTransport(id, plugin.getMaxLatency());
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
continue;
}
try {
if(plugin.start()) {
simplexPlugins.add(plugin);
} else {
if(LOG.isLoggable(INFO))
LOG.info(plugin.getClass().getSimpleName()
+ " did not start");
}
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
Collection<SimplexPluginFactory> sFactories =
simplexPluginConfig.getFactories();
final CountDownLatch sLatch = new CountDownLatch(sFactories.size());
for(SimplexPluginFactory factory : sFactories) {
pluginExecutor.execute(new SimplexPluginStarter(factory, sLatch));
}
// Instantiate and start the duplex plugins
if(LOG.isLoggable(INFO)) LOG.info("Starting duplex plugins");
for(DuplexPluginFactory factory : duplexPluginConfig.getFactories()) {
TransportId id = factory.getId();
if(!ids.add(id)) {
if(LOG.isLoggable(WARNING))
LOG.warning("Duplicate transport ID: " + id);
continue;
}
DuplexCallback callback = new DuplexCallback(id);
DuplexPlugin plugin = factory.createPlugin(callback);
if(plugin == null) {
if(LOG.isLoggable(INFO)) {
LOG.info(factory.getClass().getSimpleName()
+ " did not create a plugin");
}
continue;
}
try {
db.addTransport(id, plugin.getMaxLatency());
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
continue;
}
try {
if(plugin.start()) {
duplexPlugins.add(plugin);
} else {
if(LOG.isLoggable(INFO))
LOG.info(plugin.getClass().getSimpleName()
+ " did not start");
}
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
Collection<DuplexPluginFactory> dFactories =
duplexPluginConfig.getFactories();
final CountDownLatch dLatch = new CountDownLatch(dFactories.size());
for(DuplexPluginFactory factory : dFactories) {
pluginExecutor.execute(new DuplexPluginStarter(factory, dLatch));
}
// Wait for the plugins to start
try {
sLatch.await();
dLatch.await();
} catch(InterruptedException e) {
if(LOG.isLoggable(WARNING))
LOG.warning("Interrupted while starting plugins");
Thread.currentThread().interrupt();
return 0;
}
// Start the poller
if(LOG.isLoggable(INFO)) LOG.info("Starting poller");
@@ -157,51 +111,175 @@ class PluginManagerImpl implements PluginManager {
plugins.addAll(duplexPlugins);
poller.start(Collections.unmodifiableList(plugins));
// Return the number of plugins successfully started
return simplexPlugins.size() + duplexPlugins.size();
return plugins.size();
}
public synchronized int stop() {
int stopped = 0;
// Stop the poller
if(LOG.isLoggable(INFO)) LOG.info("Stopping poller");
poller.stop();
final AtomicInteger stopped = new AtomicInteger(0);
int plugins = simplexPlugins.size() + duplexPlugins.size();
final CountDownLatch latch = new CountDownLatch(plugins);
// Stop the simplex plugins
if(LOG.isLoggable(INFO)) LOG.info("Stopping simplex plugins");
for(SimplexPlugin plugin : simplexPlugins) {
try {
plugin.stop();
stopped++;
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
pluginExecutor.execute(new PluginStopper(plugin, latch, stopped));
}
simplexPlugins.clear();
// Stop the duplex plugins
if(LOG.isLoggable(INFO)) LOG.info("Stopping duplex plugins");
for(DuplexPlugin plugin : duplexPlugins) {
try {
plugin.stop();
stopped++;
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
pluginExecutor.execute(new PluginStopper(plugin, latch, stopped));
}
simplexPlugins.clear();
duplexPlugins.clear();
// Wait for all the plugins to stop
try {
latch.await();
} catch(InterruptedException e) {
if(LOG.isLoggable(WARNING))
LOG.warning("Interrupted while stopping plugins");
Thread.currentThread().interrupt();
return 0;
}
// Shut down the executors
if(LOG.isLoggable(INFO)) LOG.info("Stopping executors");
pluginExecutor.shutdown();
androidExecutor.shutdown();
// Return the number of plugins successfully stopped
return stopped;
return stopped.get();
}
public synchronized Collection<DuplexPlugin> getInvitationPlugins() {
public Collection<DuplexPlugin> getInvitationPlugins() {
List<DuplexPlugin> supported = new ArrayList<DuplexPlugin>();
for(DuplexPlugin d : duplexPlugins)
if(d.supportsInvitations()) supported.add(d);
return Collections.unmodifiableList(supported);
}
private class SimplexPluginStarter implements Runnable {
private final SimplexPluginFactory factory;
private final CountDownLatch latch;
private SimplexPluginStarter(SimplexPluginFactory factory,
CountDownLatch latch) {
this.factory = factory;
this.latch = latch;
}
public void run() {
try {
TransportId id = factory.getId();
SimplexCallback callback = new SimplexCallback(id);
SimplexPlugin plugin = factory.createPlugin(callback);
if(plugin == null) {
if(LOG.isLoggable(INFO)) {
String name = factory.getClass().getSimpleName();
LOG.info(name + " did not create a plugin");
}
return;
}
try {
db.addTransport(id, plugin.getMaxLatency());
} catch(DbException e) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
return;
}
try {
if(plugin.start()) {
simplexPlugins.add(plugin);
} else {
if(LOG.isLoggable(INFO)) {
String name = plugin.getClass().getSimpleName();
LOG.info(name + " did not start");
}
}
} catch(IOException e) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
} finally {
latch.countDown();
}
}
}
private class DuplexPluginStarter implements Runnable {
private final DuplexPluginFactory factory;
private final CountDownLatch latch;
private DuplexPluginStarter(DuplexPluginFactory factory,
CountDownLatch latch) {
this.factory = factory;
this.latch = latch;
}
public void run() {
try {
TransportId id = factory.getId();
DuplexCallback callback = new DuplexCallback(id);
DuplexPlugin plugin = factory.createPlugin(callback);
if(plugin == null) {
if(LOG.isLoggable(INFO)) {
String name = factory.getClass().getSimpleName();
LOG.info(name + " did not create a plugin");
}
return;
}
try {
db.addTransport(id, plugin.getMaxLatency());
} catch(DbException e) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
return;
}
try {
if(plugin.start()) {
duplexPlugins.add(plugin);
} else {
if(LOG.isLoggable(INFO)) {
String name = plugin.getClass().getSimpleName();
LOG.info(name + " did not start");
}
}
} catch(IOException e) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
} finally {
latch.countDown();
}
}
}
private class PluginStopper implements Runnable {
private final Plugin plugin;
private final CountDownLatch latch;
private final AtomicInteger stopped;
private PluginStopper(Plugin plugin, CountDownLatch latch,
AtomicInteger stopped) {
this.plugin = plugin;
this.latch = latch;
this.stopped = stopped;
}
public void run() {
try {
plugin.stop();
stopped.incrementAndGet();
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
latch.countDown();
}
}
}
private abstract class PluginCallbackImpl implements PluginCallback {
protected final TransportId id;