Merge branch 'poller-refactoring' into 'master'

Poller refactoring, replace Timer with ScheduledExecutorService

* Replace Timer with ScheduledExecutorService (closes #258)
* Move automatic connection logic from PluginManager to Poller
* Reschedule polling when connections are opened or closed, making the poller more responsive to reductions in the polling interval


See merge request !180
This commit is contained in:
akwizgran
2016-05-11 14:45:50 +00:00
30 changed files with 876 additions and 648 deletions

View File

@@ -3,18 +3,13 @@ package org.briarproject.plugins;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.TransportDisabledEvent;
import org.briarproject.api.event.TransportEnabledEvent;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.lifecycle.Service;
import org.briarproject.api.lifecycle.ServiceException;
import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginCallback;
import org.briarproject.api.plugins.PluginConfig;
@@ -51,7 +46,7 @@ import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
class PluginManagerImpl implements PluginManager, Service, EventListener {
class PluginManagerImpl implements PluginManager, Service {
private static final Logger LOG =
Logger.getLogger(PluginManagerImpl.class.getName());
@@ -59,9 +54,7 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
private final Executor ioExecutor;
private final EventBus eventBus;
private final PluginConfig pluginConfig;
private final Poller poller;
private final ConnectionManager connectionManager;
private final ConnectionRegistry connectionRegistry;
private final SettingsManager settingsManager;
private final TransportPropertyManager transportPropertyManager;
private final UiCallback uiCallback;
@@ -71,18 +64,14 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
@Inject
PluginManagerImpl(@IoExecutor Executor ioExecutor, EventBus eventBus,
PluginConfig pluginConfig, Poller poller,
ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry,
PluginConfig pluginConfig, ConnectionManager connectionManager,
SettingsManager settingsManager,
TransportPropertyManager transportPropertyManager,
UiCallback uiCallback) {
this.ioExecutor = ioExecutor;
this.eventBus = eventBus;
this.pluginConfig = pluginConfig;
this.poller = poller;
this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry;
this.settingsManager = settingsManager;
this.transportPropertyManager = transportPropertyManager;
this.uiCallback = uiCallback;
@@ -93,39 +82,53 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
@Override
public void startService() throws ServiceException {
Collection<SimplexPluginFactory> simplexFactories =
pluginConfig.getSimplexFactories();
Collection<DuplexPluginFactory> duplexFactories =
pluginConfig.getDuplexFactories();
int numPlugins = simplexFactories.size() + duplexFactories.size();
CountDownLatch latch = new CountDownLatch(numPlugins);
// Instantiate and start the simplex plugins
LOG.info("Starting simplex plugins");
Collection<SimplexPluginFactory> sFactories =
pluginConfig.getSimplexFactories();
final CountDownLatch sLatch = new CountDownLatch(sFactories.size());
for (SimplexPluginFactory factory : sFactories)
ioExecutor.execute(new SimplexPluginStarter(factory, sLatch));
for (SimplexPluginFactory f : simplexFactories) {
TransportId t = f.getId();
SimplexPlugin s = f.createPlugin(new SimplexCallback(t));
if (s == null) {
if (LOG.isLoggable(WARNING))
LOG.warning("Could not create plugin for " + t);
latch.countDown();
} else {
plugins.put(t, s);
simplexPlugins.add(s);
ioExecutor.execute(new PluginStarter(s, latch));
}
}
// Instantiate and start the duplex plugins
LOG.info("Starting duplex plugins");
Collection<DuplexPluginFactory> dFactories =
pluginConfig.getDuplexFactories();
final CountDownLatch dLatch = new CountDownLatch(dFactories.size());
for (DuplexPluginFactory factory : dFactories)
ioExecutor.execute(new DuplexPluginStarter(factory, dLatch));
// Wait for the plugins to start
for (DuplexPluginFactory f : duplexFactories) {
TransportId t = f.getId();
DuplexPlugin d = f.createPlugin(new DuplexCallback(t));
if (d == null) {
if (LOG.isLoggable(WARNING))
LOG.warning("Could not create plugin for " + t);
latch.countDown();
} else {
plugins.put(t, d);
duplexPlugins.add(d);
ioExecutor.execute(new PluginStarter(d, latch));
}
}
// Wait for all the plugins to start
try {
sLatch.await();
dLatch.await();
latch.await();
} catch (InterruptedException e) {
throw new ServiceException(e);
}
// Listen for events
eventBus.addListener(this);
}
@Override
public void stopService() throws ServiceException {
// Stop listening for events
eventBus.removeListener(this);
// Stop the poller
LOG.info("Stopping poller");
poller.stop();
final CountDownLatch latch = new CountDownLatch(plugins.size());
CountDownLatch latch = new CountDownLatch(plugins.size());
// Stop the simplex plugins
LOG.info("Stopping simplex plugins");
for (SimplexPlugin plugin : simplexPlugins)
@@ -147,6 +150,16 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
return plugins.get(t);
}
@Override
public Collection<SimplexPlugin> getSimplexPlugins() {
return Collections.unmodifiableList(simplexPlugins);
}
@Override
public Collection<DuplexPlugin> getDuplexPlugins() {
return Collections.unmodifiableList(duplexPlugins);
}
@Override
public Collection<DuplexPlugin> getInvitationPlugins() {
List<DuplexPlugin> supported = new ArrayList<DuplexPlugin>();
@@ -163,158 +176,32 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
return Collections.unmodifiableList(supported);
}
@Override
public void eventOccurred(Event e) {
if (e instanceof ContactStatusChangedEvent) {
ContactStatusChangedEvent c = (ContactStatusChangedEvent) e;
if (c.isActive()) {
// Connect to the newly activated contact
connectToContact(c.getContactId());
}
} else if (e instanceof ConnectionClosedEvent) {
ConnectionClosedEvent c = (ConnectionClosedEvent) e;
if (!c.isIncoming()) {
// Connect to the disconnected contact
connectToContact(c.getContactId(), c.getTransportId());
}
}
}
private class PluginStarter implements Runnable {
private void connectToContact(ContactId c) {
for (SimplexPlugin s : simplexPlugins)
if (s.shouldPoll()) connectToContact(c, s);
for (DuplexPlugin d : duplexPlugins)
if (d.shouldPoll()) connectToContact(c, d);
}
private void connectToContact(ContactId c, TransportId t) {
Plugin p = plugins.get(t);
if (p instanceof SimplexPlugin && p.shouldPoll())
connectToContact(c, (SimplexPlugin) p);
else if (p instanceof DuplexPlugin && p.shouldPoll())
connectToContact(c, (DuplexPlugin) p);
}
private void connectToContact(final ContactId c, final SimplexPlugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
TransportConnectionWriter w = p.createWriter(c);
if (w != null)
connectionManager.manageOutgoingConnection(c, t, w);
}
}
});
}
private void connectToContact(final ContactId c, final DuplexPlugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
DuplexTransportConnection d = p.createConnection(c);
if (d != null)
connectionManager.manageOutgoingConnection(c, t, d);
}
}
});
}
private class SimplexPluginStarter implements Runnable {
private final SimplexPluginFactory factory;
private final Plugin plugin;
private final CountDownLatch latch;
private SimplexPluginStarter(SimplexPluginFactory factory,
CountDownLatch latch) {
this.factory = factory;
private PluginStarter(Plugin plugin, CountDownLatch latch) {
this.plugin = plugin;
this.latch = latch;
}
@Override
public void run() {
try {
TransportId id = factory.getId();
SimplexCallback callback = new SimplexCallback(id);
SimplexPlugin plugin = factory.createPlugin(callback);
if (plugin == null) {
if (LOG.isLoggable(INFO)) {
String name = factory.getClass().getSimpleName();
LOG.info(name + " did not create a plugin");
}
return;
}
try {
long start = System.currentTimeMillis();
boolean started = plugin.start();
long duration = System.currentTimeMillis() - start;
if (started) {
plugins.put(id, plugin);
simplexPlugins.add(plugin);
if (LOG.isLoggable(INFO)) {
String name = plugin.getClass().getSimpleName();
LOG.info("Starting " + name + " took " +
duration + " ms");
LOG.info("Starting plugin " + plugin.getId()
+ " took " + duration + " ms");
}
} else {
if (LOG.isLoggable(WARNING)) {
String name = plugin.getClass().getSimpleName();
LOG.warning(name + " did not start");
}
}
} catch (IOException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
} finally {
latch.countDown();
}
}
}
private class DuplexPluginStarter implements Runnable {
private final DuplexPluginFactory factory;
private final CountDownLatch latch;
private DuplexPluginStarter(DuplexPluginFactory factory,
CountDownLatch latch) {
this.factory = factory;
this.latch = latch;
}
@Override
public void run() {
try {
TransportId id = factory.getId();
DuplexCallback callback = new DuplexCallback(id);
DuplexPlugin plugin = factory.createPlugin(callback);
if (plugin == null) {
if (LOG.isLoggable(INFO)) {
String name = factory.getClass().getSimpleName();
LOG.info(name + " did not create a plugin");
}
return;
}
try {
long start = System.currentTimeMillis();
boolean started = plugin.start();
long duration = System.currentTimeMillis() - start;
if (started) {
plugins.put(id, plugin);
duplexPlugins.add(plugin);
if (LOG.isLoggable(INFO)) {
String name = plugin.getClass().getSimpleName();
LOG.info("Starting " + name + " took " +
duration + " ms");
}
} else {
if (LOG.isLoggable(WARNING)) {
String name = plugin.getClass().getSimpleName();
LOG.warning(name + " did not start");
LOG.warning("Plugin" + plugin.getId()
+ " did not start");
}
}
} catch (IOException e) {
@@ -344,8 +231,8 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
plugin.stop();
long duration = System.currentTimeMillis() - start;
if (LOG.isLoggable(INFO)) {
String name = plugin.getClass().getSimpleName();
LOG.info("Stopping " + name + " took " + duration + " ms");
LOG.info("Stopping plugin " + plugin.getId()
+ " took " + duration + " ms");
}
} catch (IOException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -431,8 +318,6 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
@Override
public void transportEnabled() {
eventBus.broadcast(new TransportEnabledEvent(id));
Plugin p = plugins.get(id);
if (p != null) poller.pollNow(p);
}
@Override

View File

@@ -7,14 +7,11 @@ import org.briarproject.api.plugins.BackoffFactory;
import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.sync.SyncSessionFactory;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.KeyManager;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.api.transport.StreamWriterFactory;
import org.briarproject.api.system.Clock;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -28,6 +25,8 @@ public class PluginsModule {
public static class EagerSingletons {
@Inject
PluginManager pluginManager;
@Inject
Poller poller;
}
@Provides
@@ -36,33 +35,35 @@ public class PluginsModule {
}
@Provides
@Singleton
Poller providePoller(@IoExecutor Executor ioExecutor,
ConnectionRegistry connectionRegistry, SecureRandom random,
Timer timer) {
return new PollerImpl(ioExecutor, connectionRegistry, random, timer);
ScheduledExecutorService scheduler,
ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry, PluginManager pluginManager,
SecureRandom random, Clock clock, EventBus eventBus) {
Poller poller = new Poller(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random, clock);
eventBus.addListener(poller);
return poller;
}
@Provides
@Singleton
ConnectionManager provideConnectionManager(
@IoExecutor Executor ioExecutor, KeyManager keyManager,
StreamReaderFactory streamReaderFactory,
StreamWriterFactory streamWriterFactory,
SyncSessionFactory syncSessionFactory,
ConnectionRegistry connectionRegistry) {
return new ConnectionManagerImpl(ioExecutor, keyManager,
streamReaderFactory, streamWriterFactory, syncSessionFactory,
connectionRegistry);
ConnectionManagerImpl connectionManager) {
return connectionManager;
}
@Provides
@Singleton
ConnectionRegistry provideConnectionRegistry(EventBus eventBus) {
return new ConnectionRegistryImpl(eventBus);
ConnectionRegistry provideConnectionRegistry(
ConnectionRegistryImpl connectionRegistry) {
return connectionRegistry;
}
@Provides
@Singleton
PluginManager getPluginManager(LifecycleManager lifecycleManager,
PluginManager providePluginManager(LifecycleManager lifecycleManager,
PluginManagerImpl pluginManager) {
lifecycleManager.registerService(pluginManager);
return pluginManager;

View File

@@ -1,12 +1,203 @@
package org.briarproject.plugins;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ConnectionOpenedEvent;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.TransportEnabledEvent;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.plugins.simplex.SimplexPlugin;
import org.briarproject.api.system.Clock;
interface Poller {
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
/** Tells the poller to poll the given plugin immediately. */
void pollNow(Plugin p);
import javax.inject.Inject;
/** Stops the poller. */
void stop();
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
class Poller implements EventListener {
private static final Logger LOG = Logger.getLogger(Poller.class.getName());
private final Executor ioExecutor;
private final ScheduledExecutorService scheduler;
private final ConnectionManager connectionManager;
private final ConnectionRegistry connectionRegistry;
private final PluginManager pluginManager;
private final SecureRandom random;
private final Clock clock;
private final Lock lock;
private final Map<TransportId, PollTask> tasks; // Locking: lock
@Inject
Poller(@IoExecutor Executor ioExecutor, ScheduledExecutorService scheduler,
ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry, PluginManager pluginManager,
SecureRandom random, Clock clock) {
this.ioExecutor = ioExecutor;
this.scheduler = scheduler;
this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry;
this.pluginManager = pluginManager;
this.random = random;
this.clock = clock;
lock = new ReentrantLock();
tasks = new HashMap<TransportId, PollTask>();
}
@Override
public void eventOccurred(Event e) {
if (e instanceof ContactStatusChangedEvent) {
ContactStatusChangedEvent c = (ContactStatusChangedEvent) e;
if (c.isActive()) {
// Connect to the newly activated contact
connectToContact(c.getContactId());
}
} else if (e instanceof ConnectionClosedEvent) {
ConnectionClosedEvent c = (ConnectionClosedEvent) e;
// Reschedule polling, the polling interval may have decreased
reschedule(c.getTransportId());
if (!c.isIncoming()) {
// Connect to the disconnected contact
connectToContact(c.getContactId(), c.getTransportId());
}
} else if (e instanceof ConnectionOpenedEvent) {
ConnectionOpenedEvent c = (ConnectionOpenedEvent) e;
// Reschedule polling, the polling interval may have decreased
reschedule(c.getTransportId());
} else if (e instanceof TransportEnabledEvent) {
TransportEnabledEvent t = (TransportEnabledEvent) e;
// Poll the newly enabled transport
pollNow(t.getTransportId());
}
}
private void connectToContact(ContactId c) {
for (SimplexPlugin s : pluginManager.getSimplexPlugins())
if (s.shouldPoll()) connectToContact(c, s);
for (DuplexPlugin d : pluginManager.getDuplexPlugins())
if (d.shouldPoll()) connectToContact(c, d);
}
private void connectToContact(ContactId c, TransportId t) {
Plugin p = pluginManager.getPlugin(t);
if (p instanceof SimplexPlugin && p.shouldPoll())
connectToContact(c, (SimplexPlugin) p);
else if (p instanceof DuplexPlugin && p.shouldPoll())
connectToContact(c, (DuplexPlugin) p);
}
private void connectToContact(final ContactId c, final SimplexPlugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
TransportConnectionWriter w = p.createWriter(c);
if (w != null)
connectionManager.manageOutgoingConnection(c, t, w);
}
}
});
}
private void connectToContact(final ContactId c, final DuplexPlugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
DuplexTransportConnection d = p.createConnection(c);
if (d != null)
connectionManager.manageOutgoingConnection(c, t, d);
}
}
});
}
private void reschedule(TransportId t) {
Plugin p = pluginManager.getPlugin(t);
if (p.shouldPoll()) schedule(p, p.getPollingInterval(), false);
}
private void pollNow(TransportId t) {
Plugin p = pluginManager.getPlugin(t);
// Randomise next polling interval
if (p.shouldPoll()) schedule(p, 0, true);
}
private void schedule(Plugin p, int delay, boolean randomiseNext) {
// Replace any later scheduled task for this plugin
long due = clock.currentTimeMillis() + delay;
TransportId t = p.getId();
lock.lock();
try {
PollTask scheduled = tasks.get(t);
if (scheduled == null || due < scheduled.due) {
PollTask task = new PollTask(p, due, randomiseNext);
tasks.put(t, task);
scheduler.schedule(task, delay, MILLISECONDS);
}
} finally {
lock.unlock();
}
}
private void poll(final Plugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
TransportId t = p.getId();
if (LOG.isLoggable(INFO)) LOG.info("Polling plugin " + t);
p.poll(connectionRegistry.getConnectedContacts(t));
}
});
}
private class PollTask implements Runnable {
private final Plugin plugin;
private final long due;
private final boolean randomiseNext;
private PollTask(Plugin plugin, long due, boolean randomiseNext) {
this.plugin = plugin;
this.due = due;
this.randomiseNext = randomiseNext;
}
@Override
public void run() {
lock.lock();
try {
TransportId t = plugin.getId();
if (tasks.get(t) != this) return; // Replaced by another task
tasks.remove(t);
} finally {
lock.unlock();
}
int delay = plugin.getPollingInterval();
if (randomiseNext) delay = (int) (delay * random.nextDouble());
schedule(plugin, delay, false);
poll(plugin);
}
}
}

View File

@@ -1,92 +0,0 @@
package org.briarproject.plugins;
import org.briarproject.api.TransportId;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.system.Timer;
import java.security.SecureRandom;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
class PollerImpl implements Poller {
private static final Logger LOG =
Logger.getLogger(PollerImpl.class.getName());
private final Executor ioExecutor;
private final ConnectionRegistry connectionRegistry;
private final SecureRandom random;
private final Timer timer;
private final Map<TransportId, PollTask> tasks;
@Inject
PollerImpl(@IoExecutor Executor ioExecutor,
ConnectionRegistry connectionRegistry, SecureRandom random,
Timer timer) {
this.ioExecutor = ioExecutor;
this.connectionRegistry = connectionRegistry;
this.random = random;
this.timer = timer;
tasks = new ConcurrentHashMap<TransportId, PollTask>();
}
@Override
public void stop() {
timer.cancel();
}
@Override
public void pollNow(Plugin p) {
// Randomise next polling interval
if (p.shouldPoll()) schedule(p, 0, true);
}
private void schedule(Plugin p, int interval, boolean randomiseNext) {
// Replace any previously scheduled task for this plugin
PollTask task = new PollTask(p, randomiseNext);
PollTask replaced = tasks.put(p.getId(), task);
if (replaced != null) replaced.cancel();
timer.schedule(task, interval);
}
private void poll(final Plugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
if (LOG.isLoggable(INFO))
LOG.info("Polling " + p.getClass().getSimpleName());
p.poll(connectionRegistry.getConnectedContacts(p.getId()));
}
});
}
private class PollTask extends TimerTask {
private final Plugin plugin;
private final boolean randomiseNext;
private PollTask(Plugin plugin, boolean randomiseNext) {
this.plugin = plugin;
this.randomiseNext = randomiseNext;
}
@Override
public void run() {
tasks.remove(plugin.getId());
int interval = plugin.getPollingInterval();
if (randomiseNext)
interval = (int) (interval * random.nextDouble());
schedule(plugin, interval, false);
poll(plugin);
}
}
}