From 4fcce7116c0b083ae9bb0bd57550712a2155027c Mon Sep 17 00:00:00 2001 From: akwizgran Date: Tue, 14 May 2019 17:51:54 +0100 Subject: [PATCH] Decouple poller from plugin manager. --- .../bramble/plugin/PluginManagerImpl.java | 29 +- .../bramble/plugin/PluginModule.java | 12 + .../briarproject/bramble/plugin/Poller.java | 260 +---------------- .../bramble/plugin/PollerImpl.java | 264 ++++++++++++++++++ .../bramble/plugin/PluginManagerImplTest.java | 16 +- .../{PollerTest.java => PollerImplTest.java} | 68 ++--- 6 files changed, 308 insertions(+), 341 deletions(-) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java rename bramble-core/src/test/java/org/briarproject/bramble/plugin/{PollerTest.java => PollerImplTest.java} (88%) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginManagerImpl.java index 34fecf711..c935ec6ac 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginManagerImpl.java @@ -8,7 +8,6 @@ import org.briarproject.bramble.api.lifecycle.Service; import org.briarproject.bramble.api.lifecycle.ServiceException; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.ConnectionManager; -import org.briarproject.bramble.api.plugin.ConnectionRegistry; import org.briarproject.bramble.api.plugin.Plugin; import org.briarproject.bramble.api.plugin.PluginCallback; import org.briarproject.bramble.api.plugin.PluginConfig; @@ -30,10 +29,7 @@ import org.briarproject.bramble.api.properties.TransportProperties; import org.briarproject.bramble.api.properties.TransportPropertyManager; import org.briarproject.bramble.api.settings.Settings; import org.briarproject.bramble.api.settings.SettingsManager; -import org.briarproject.bramble.api.system.Clock; -import org.briarproject.bramble.api.system.Scheduler; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -42,7 +38,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -64,15 +59,11 @@ class PluginManagerImpl implements PluginManager, Service { Logger.getLogger(PluginManagerImpl.class.getName()); private final Executor ioExecutor; - private final ScheduledExecutorService scheduler; private final EventBus eventBus; private final PluginConfig pluginConfig; private final ConnectionManager connectionManager; - private final ConnectionRegistry connectionRegistry; private final SettingsManager settingsManager; private final TransportPropertyManager transportPropertyManager; - private final SecureRandom random; - private final Clock clock; private final Map plugins; private final List simplexPlugins; private final List duplexPlugins; @@ -80,41 +71,25 @@ class PluginManagerImpl implements PluginManager, Service { private final AtomicBoolean used = new AtomicBoolean(false); @Inject - PluginManagerImpl(@IoExecutor Executor ioExecutor, - @Scheduler ScheduledExecutorService scheduler, EventBus eventBus, + PluginManagerImpl(@IoExecutor Executor ioExecutor, EventBus eventBus, PluginConfig pluginConfig, ConnectionManager connectionManager, - ConnectionRegistry connectionRegistry, SettingsManager settingsManager, - TransportPropertyManager transportPropertyManager, - SecureRandom random, Clock clock) { + TransportPropertyManager transportPropertyManager) { this.ioExecutor = ioExecutor; - this.scheduler = scheduler; this.eventBus = eventBus; this.pluginConfig = pluginConfig; this.connectionManager = connectionManager; - this.connectionRegistry = connectionRegistry; this.settingsManager = settingsManager; this.transportPropertyManager = transportPropertyManager; - this.random = random; - this.clock = clock; plugins = new ConcurrentHashMap<>(); simplexPlugins = new CopyOnWriteArrayList<>(); duplexPlugins = new CopyOnWriteArrayList<>(); startLatches = new ConcurrentHashMap<>(); - } @Override public void startService() { if (used.getAndSet(true)) throw new IllegalStateException(); - // Instantiate the poller - if (pluginConfig.shouldPoll()) { - LOG.info("Starting poller"); - Poller poller = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, this, transportPropertyManager, random, - clock); - eventBus.addListener(poller); - } // Instantiate the simplex plugins and start them asynchronously LOG.info("Starting simplex plugins"); for (SimplexPluginFactory f : pluginConfig.getSimplexFactories()) { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginModule.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginModule.java index 499da88b9..bca57cc1e 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PluginModule.java @@ -1,9 +1,11 @@ package org.briarproject.bramble.plugin; +import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.plugin.BackoffFactory; import org.briarproject.bramble.api.plugin.ConnectionManager; import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.PluginConfig; import org.briarproject.bramble.api.plugin.PluginManager; import javax.inject.Inject; @@ -18,6 +20,8 @@ public class PluginModule { public static class EagerSingletons { @Inject PluginManager pluginManager; + @Inject + Poller poller; } @Provides @@ -46,4 +50,12 @@ public class PluginModule { lifecycleManager.registerService(pluginManager); return pluginManager; } + + @Provides + @Singleton + Poller providePoller(PluginConfig config, EventBus eventBus, + PollerImpl poller) { + if (config.shouldPoll()) eventBus.addListener(poller); + return poller; + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/Poller.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/Poller.java index 6d758f1c9..ede44cb48 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/Poller.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/Poller.java @@ -1,259 +1,7 @@ package org.briarproject.bramble.plugin; -import org.briarproject.bramble.api.contact.ContactId; -import org.briarproject.bramble.api.contact.event.ContactAddedEvent; -import org.briarproject.bramble.api.db.DbException; -import org.briarproject.bramble.api.event.Event; -import org.briarproject.bramble.api.event.EventListener; -import org.briarproject.bramble.api.lifecycle.IoExecutor; -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.plugin.ConnectionManager; -import org.briarproject.bramble.api.plugin.ConnectionRegistry; -import org.briarproject.bramble.api.plugin.Plugin; -import org.briarproject.bramble.api.plugin.PluginManager; -import org.briarproject.bramble.api.plugin.TransportConnectionWriter; -import org.briarproject.bramble.api.plugin.TransportId; -import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin; -import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; -import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent; -import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent; -import org.briarproject.bramble.api.plugin.event.TransportDisabledEvent; -import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent; -import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; -import org.briarproject.bramble.api.properties.TransportProperties; -import org.briarproject.bramble.api.properties.TransportPropertyManager; -import org.briarproject.bramble.api.system.Clock; -import org.briarproject.bramble.api.system.Scheduler; - -import java.security.SecureRandom; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Logger; - -import javax.annotation.concurrent.ThreadSafe; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.logging.Level.INFO; -import static java.util.logging.Level.WARNING; -import static org.briarproject.bramble.util.LogUtils.logException; - -@ThreadSafe -@NotNullByDefault -class Poller implements EventListener { - - private static final Logger LOG = Logger.getLogger(Poller.class.getName()); - - private final Executor ioExecutor; - private final ScheduledExecutorService scheduler; - private final ConnectionManager connectionManager; - private final ConnectionRegistry connectionRegistry; - private final PluginManager pluginManager; - private final TransportPropertyManager transportPropertyManager; - private final SecureRandom random; - private final Clock clock; - private final Lock lock; - private final Map tasks; // Locking: lock - - Poller(@IoExecutor Executor ioExecutor, - @Scheduler ScheduledExecutorService scheduler, - ConnectionManager connectionManager, - ConnectionRegistry connectionRegistry, PluginManager pluginManager, - TransportPropertyManager transportPropertyManager, - SecureRandom random, Clock clock) { - this.ioExecutor = ioExecutor; - this.scheduler = scheduler; - this.connectionManager = connectionManager; - this.connectionRegistry = connectionRegistry; - this.pluginManager = pluginManager; - this.transportPropertyManager = transportPropertyManager; - this.random = random; - this.clock = clock; - lock = new ReentrantLock(); - tasks = new HashMap<>(); - } - - @Override - public void eventOccurred(Event e) { - if (e instanceof ContactAddedEvent) { - ContactAddedEvent c = (ContactAddedEvent) e; - // Connect to the newly activated contact - connectToContact(c.getContactId()); - } else if (e instanceof ConnectionClosedEvent) { - ConnectionClosedEvent c = (ConnectionClosedEvent) e; - // Reschedule polling, the polling interval may have decreased - reschedule(c.getTransportId()); - if (!c.isIncoming()) { - // Connect to the disconnected contact - connectToContact(c.getContactId(), c.getTransportId()); - } - } else if (e instanceof ConnectionOpenedEvent) { - ConnectionOpenedEvent c = (ConnectionOpenedEvent) e; - // Reschedule polling, the polling interval may have decreased - reschedule(c.getTransportId()); - } else if (e instanceof TransportEnabledEvent) { - TransportEnabledEvent t = (TransportEnabledEvent) e; - // Poll the newly enabled transport - pollNow(t.getTransportId()); - } else if (e instanceof TransportDisabledEvent) { - TransportDisabledEvent t = (TransportDisabledEvent) e; - // Cancel polling for the disabled transport - cancel(t.getTransportId()); - } - } - - private void connectToContact(ContactId c) { - for (SimplexPlugin s : pluginManager.getSimplexPlugins()) - if (s.shouldPoll()) connectToContact(c, s); - for (DuplexPlugin d : pluginManager.getDuplexPlugins()) - if (d.shouldPoll()) connectToContact(c, d); - } - - private void connectToContact(ContactId c, TransportId t) { - Plugin p = pluginManager.getPlugin(t); - if (p instanceof SimplexPlugin && p.shouldPoll()) - connectToContact(c, (SimplexPlugin) p); - else if (p instanceof DuplexPlugin && p.shouldPoll()) - connectToContact(c, (DuplexPlugin) p); - } - - private void connectToContact(ContactId c, SimplexPlugin p) { - ioExecutor.execute(() -> { - TransportId t = p.getId(); - if (connectionRegistry.isConnected(c, t)) return; - try { - TransportProperties props = - transportPropertyManager.getRemoteProperties(c, t); - TransportConnectionWriter w = p.createWriter(props); - if (w != null) - connectionManager.manageOutgoingConnection(c, t, w); - } catch (DbException e) { - logException(LOG, WARNING, e); - } - }); - } - - private void connectToContact(ContactId c, DuplexPlugin p) { - ioExecutor.execute(() -> { - TransportId t = p.getId(); - if (connectionRegistry.isConnected(c, t)) return; - try { - TransportProperties props = - transportPropertyManager.getRemoteProperties(c, t); - DuplexTransportConnection d = p.createConnection(props); - if (d != null) - connectionManager.manageOutgoingConnection(c, t, d); - } catch (DbException e) { - logException(LOG, WARNING, e); - } - }); - } - - private void reschedule(TransportId t) { - Plugin p = pluginManager.getPlugin(t); - if (p != null && p.shouldPoll()) - schedule(p, p.getPollingInterval(), false); - } - - private void pollNow(TransportId t) { - Plugin p = pluginManager.getPlugin(t); - // Randomise next polling interval - if (p != null && p.shouldPoll()) schedule(p, 0, true); - } - - private void schedule(Plugin p, int delay, boolean randomiseNext) { - // Replace any later scheduled task for this plugin - long due = clock.currentTimeMillis() + delay; - TransportId t = p.getId(); - lock.lock(); - try { - ScheduledPollTask scheduled = tasks.get(t); - if (scheduled == null || due < scheduled.task.due) { - // If a later task exists, cancel it. If it's already started - // it will abort safely when it finds it's been replaced - if (scheduled != null) scheduled.future.cancel(false); - PollTask task = new PollTask(p, due, randomiseNext); - Future future = scheduler.schedule( - () -> ioExecutor.execute(task), delay, MILLISECONDS); - tasks.put(t, new ScheduledPollTask(task, future)); - } - } finally { - lock.unlock(); - } - } - - private void cancel(TransportId t) { - lock.lock(); - try { - ScheduledPollTask scheduled = tasks.remove(t); - if (scheduled != null) scheduled.future.cancel(false); - } finally { - lock.unlock(); - } - } - - @IoExecutor - private void poll(Plugin p) { - TransportId t = p.getId(); - if (LOG.isLoggable(INFO)) LOG.info("Polling plugin " + t); - try { - Map remote = - transportPropertyManager.getRemoteProperties(t); - Collection connected = - connectionRegistry.getConnectedContacts(t); - remote = new HashMap<>(remote); - remote.keySet().removeAll(connected); - if (!remote.isEmpty()) p.poll(remote); - } catch (DbException e) { - logException(LOG, WARNING, e); - } - } - - private class ScheduledPollTask { - - private final PollTask task; - private final Future future; - - private ScheduledPollTask(PollTask task, Future future) { - this.task = task; - this.future = future; - } - } - - private class PollTask implements Runnable { - - private final Plugin plugin; - private final long due; - private final boolean randomiseNext; - - private PollTask(Plugin plugin, long due, boolean randomiseNext) { - this.plugin = plugin; - this.due = due; - this.randomiseNext = randomiseNext; - } - - @Override - @IoExecutor - public void run() { - lock.lock(); - try { - TransportId t = plugin.getId(); - ScheduledPollTask scheduled = tasks.get(t); - if (scheduled != null && scheduled.task != this) - return; // Replaced by another task - tasks.remove(t); - } finally { - lock.unlock(); - } - int delay = plugin.getPollingInterval(); - if (randomiseNext) delay = (int) (delay * random.nextDouble()); - schedule(plugin, delay, false); - poll(plugin); - } - } +/** + * Empty interface used for injecting the poller. + */ +interface Poller { } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java new file mode 100644 index 000000000..d807f6044 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java @@ -0,0 +1,264 @@ +package org.briarproject.bramble.plugin; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.event.ContactAddedEvent; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.event.EventListener; +import org.briarproject.bramble.api.lifecycle.IoExecutor; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionManager; +import org.briarproject.bramble.api.plugin.ConnectionRegistry; +import org.briarproject.bramble.api.plugin.Plugin; +import org.briarproject.bramble.api.plugin.PluginManager; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent; +import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent; +import org.briarproject.bramble.api.plugin.event.TransportDisabledEvent; +import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent; +import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; +import org.briarproject.bramble.api.properties.TransportProperties; +import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.system.Clock; +import org.briarproject.bramble.api.system.Scheduler; + +import java.security.SecureRandom; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.util.LogUtils.logException; + +@ThreadSafe +@NotNullByDefault +class PollerImpl implements Poller, EventListener { + + private static final Logger LOG = getLogger(PollerImpl.class.getName()); + + private final Executor ioExecutor; + private final ScheduledExecutorService scheduler; + private final ConnectionManager connectionManager; + private final ConnectionRegistry connectionRegistry; + private final PluginManager pluginManager; + private final TransportPropertyManager transportPropertyManager; + private final SecureRandom random; + private final Clock clock; + private final Lock lock; + @GuardedBy("lock") + private final Map tasks; + + @Inject + PollerImpl(@IoExecutor Executor ioExecutor, + @Scheduler ScheduledExecutorService scheduler, + ConnectionManager connectionManager, + ConnectionRegistry connectionRegistry, PluginManager pluginManager, + TransportPropertyManager transportPropertyManager, + SecureRandom random, Clock clock) { + this.ioExecutor = ioExecutor; + this.scheduler = scheduler; + this.connectionManager = connectionManager; + this.connectionRegistry = connectionRegistry; + this.pluginManager = pluginManager; + this.transportPropertyManager = transportPropertyManager; + this.random = random; + this.clock = clock; + lock = new ReentrantLock(); + tasks = new HashMap<>(); + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof ContactAddedEvent) { + ContactAddedEvent c = (ContactAddedEvent) e; + // Connect to the newly added contact + connectToContact(c.getContactId()); + } else if (e instanceof ConnectionClosedEvent) { + ConnectionClosedEvent c = (ConnectionClosedEvent) e; + // Reschedule polling, the polling interval may have decreased + reschedule(c.getTransportId()); + if (!c.isIncoming()) { + // Connect to the disconnected contact + connectToContact(c.getContactId(), c.getTransportId()); + } + } else if (e instanceof ConnectionOpenedEvent) { + ConnectionOpenedEvent c = (ConnectionOpenedEvent) e; + // Reschedule polling, the polling interval may have decreased + reschedule(c.getTransportId()); + } else if (e instanceof TransportEnabledEvent) { + TransportEnabledEvent t = (TransportEnabledEvent) e; + // Poll the newly enabled transport + pollNow(t.getTransportId()); + } else if (e instanceof TransportDisabledEvent) { + TransportDisabledEvent t = (TransportDisabledEvent) e; + // Cancel polling for the disabled transport + cancel(t.getTransportId()); + } + } + + private void connectToContact(ContactId c) { + for (SimplexPlugin s : pluginManager.getSimplexPlugins()) + if (s.shouldPoll()) connectToContact(c, s); + for (DuplexPlugin d : pluginManager.getDuplexPlugins()) + if (d.shouldPoll()) connectToContact(c, d); + } + + private void connectToContact(ContactId c, TransportId t) { + Plugin p = pluginManager.getPlugin(t); + if (p instanceof SimplexPlugin && p.shouldPoll()) + connectToContact(c, (SimplexPlugin) p); + else if (p instanceof DuplexPlugin && p.shouldPoll()) + connectToContact(c, (DuplexPlugin) p); + } + + private void connectToContact(ContactId c, SimplexPlugin p) { + ioExecutor.execute(() -> { + TransportId t = p.getId(); + if (connectionRegistry.isConnected(c, t)) return; + try { + TransportProperties props = + transportPropertyManager.getRemoteProperties(c, t); + TransportConnectionWriter w = p.createWriter(props); + if (w != null) + connectionManager.manageOutgoingConnection(c, t, w); + } catch (DbException e) { + logException(LOG, WARNING, e); + } + }); + } + + private void connectToContact(ContactId c, DuplexPlugin p) { + ioExecutor.execute(() -> { + TransportId t = p.getId(); + if (connectionRegistry.isConnected(c, t)) return; + try { + TransportProperties props = + transportPropertyManager.getRemoteProperties(c, t); + DuplexTransportConnection d = p.createConnection(props); + if (d != null) + connectionManager.manageOutgoingConnection(c, t, d); + } catch (DbException e) { + logException(LOG, WARNING, e); + } + }); + } + + private void reschedule(TransportId t) { + Plugin p = pluginManager.getPlugin(t); + if (p != null && p.shouldPoll()) + schedule(p, p.getPollingInterval(), false); + } + + private void pollNow(TransportId t) { + Plugin p = pluginManager.getPlugin(t); + // Randomise next polling interval + if (p != null && p.shouldPoll()) schedule(p, 0, true); + } + + private void schedule(Plugin p, int delay, boolean randomiseNext) { + // Replace any later scheduled task for this plugin + long due = clock.currentTimeMillis() + delay; + TransportId t = p.getId(); + lock.lock(); + try { + ScheduledPollTask scheduled = tasks.get(t); + if (scheduled == null || due < scheduled.task.due) { + // If a later task exists, cancel it. If it's already started + // it will abort safely when it finds it's been replaced + if (scheduled != null) scheduled.future.cancel(false); + PollTask task = new PollTask(p, due, randomiseNext); + Future future = scheduler.schedule(() -> + ioExecutor.execute(task), delay, MILLISECONDS); + tasks.put(t, new ScheduledPollTask(task, future)); + } + } finally { + lock.unlock(); + } + } + + private void cancel(TransportId t) { + lock.lock(); + try { + ScheduledPollTask scheduled = tasks.remove(t); + if (scheduled != null) scheduled.future.cancel(false); + } finally { + lock.unlock(); + } + } + + @IoExecutor + private void poll(Plugin p) { + TransportId t = p.getId(); + if (LOG.isLoggable(INFO)) LOG.info("Polling plugin " + t); + try { + Map remote = + transportPropertyManager.getRemoteProperties(t); + Collection connected = + connectionRegistry.getConnectedContacts(t); + remote = new HashMap<>(remote); + remote.keySet().removeAll(connected); + if (!remote.isEmpty()) p.poll(remote); + } catch (DbException e) { + logException(LOG, WARNING, e); + } + } + + private class ScheduledPollTask { + + private final PollTask task; + private final Future future; + + private ScheduledPollTask(PollTask task, Future future) { + this.task = task; + this.future = future; + } + } + + private class PollTask implements Runnable { + + private final Plugin plugin; + private final long due; + private final boolean randomiseNext; + + private PollTask(Plugin plugin, long due, boolean randomiseNext) { + this.plugin = plugin; + this.due = due; + this.randomiseNext = randomiseNext; + } + + @Override + @IoExecutor + public void run() { + lock.lock(); + try { + TransportId t = plugin.getId(); + ScheduledPollTask scheduled = tasks.get(t); + if (scheduled != null && scheduled.task != this) + return; // Replaced by another task + tasks.remove(t); + } finally { + lock.unlock(); + } + int delay = plugin.getPollingInterval(); + if (randomiseNext) delay = (int) (delay * random.nextDouble()); + schedule(plugin, delay, false); + poll(plugin); + } + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PluginManagerImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PluginManagerImplTest.java index 0cd09917d..55ad46757 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PluginManagerImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PluginManagerImplTest.java @@ -2,7 +2,6 @@ package org.briarproject.bramble.plugin; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.plugin.ConnectionManager; -import org.briarproject.bramble.api.plugin.ConnectionRegistry; import org.briarproject.bramble.api.plugin.PluginConfig; import org.briarproject.bramble.api.plugin.PluginException; import org.briarproject.bramble.api.plugin.TransportId; @@ -14,18 +13,15 @@ import org.briarproject.bramble.api.plugin.simplex.SimplexPluginCallback; import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory; import org.briarproject.bramble.api.properties.TransportPropertyManager; import org.briarproject.bramble.api.settings.SettingsManager; -import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.test.BrambleTestCase; import org.jmock.Expectations; import org.jmock.Mockery; import org.jmock.lib.concurrent.Synchroniser; import org.junit.Test; -import java.security.SecureRandom; import java.util.Arrays; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import static org.briarproject.bramble.test.TestUtils.getTransportId; @@ -37,16 +33,10 @@ public class PluginManagerImplTest extends BrambleTestCase { setThreadingPolicy(new Synchroniser()); }}; Executor ioExecutor = Executors.newSingleThreadExecutor(); - ScheduledExecutorService scheduler = - context.mock(ScheduledExecutorService.class); - SecureRandom random = new SecureRandom(); - Clock clock = context.mock(Clock.class); EventBus eventBus = context.mock(EventBus.class); PluginConfig pluginConfig = context.mock(PluginConfig.class); ConnectionManager connectionManager = context.mock(ConnectionManager.class); - ConnectionRegistry connectionRegistry = - context.mock(ConnectionRegistry.class); SettingsManager settingsManager = context.mock(SettingsManager.class); TransportPropertyManager transportPropertyManager = @@ -122,9 +112,9 @@ public class PluginManagerImplTest extends BrambleTestCase { oneOf(duplexPlugin).stop(); }}); - PluginManagerImpl p = new PluginManagerImpl(ioExecutor, scheduler, - eventBus, pluginConfig, connectionManager, connectionRegistry, - settingsManager, transportPropertyManager, random, clock); + PluginManagerImpl p = new PluginManagerImpl(ioExecutor, eventBus, + pluginConfig, connectionManager, settingsManager, + transportPropertyManager); // Two plugins should be started and stopped p.startService(); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerTest.java b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java similarity index 88% rename from bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerTest.java rename to bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java index 2fca78405..9b038ef0a 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java @@ -23,6 +23,7 @@ import org.briarproject.bramble.test.ImmediateExecutor; import org.briarproject.bramble.test.RunAction; import org.jmock.Expectations; import org.jmock.lib.legacy.ClassImposteriser; +import org.junit.Before; import org.junit.Test; import java.security.SecureRandom; @@ -39,7 +40,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getTransportId; -public class PollerTest extends BrambleMockTestCase { +public class PollerImplTest extends BrambleMockTestCase { private final ScheduledExecutorService scheduler = context.mock(ScheduledExecutorService.class); @@ -62,11 +63,20 @@ public class PollerTest extends BrambleMockTestCase { private final int pollingInterval = 60 * 1000; private final long now = System.currentTimeMillis(); - public PollerTest() { + private PollerImpl poller; + + public PollerImplTest() { context.setImposteriser(ClassImposteriser.INSTANCE); random = context.mock(SecureRandom.class); } + @Before + public void setUp() { + poller = new PollerImpl(ioExecutor, scheduler, connectionManager, + connectionRegistry, pluginManager, transportPropertyManager, + random, clock); + } + @Test public void testConnectOnContactAdded() throws Exception { // Two simplex plugins: one supports polling, the other doesn't @@ -140,11 +150,7 @@ public class PollerTest extends BrambleMockTestCase { will(returnValue(false)); }}); - Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, transportPropertyManager, - random, clock); - - p.eventOccurred(new ContactAddedEvent(contactId)); + poller.eventOccurred(new ContactAddedEvent(contactId)); } @Test @@ -194,11 +200,7 @@ public class PollerTest extends BrambleMockTestCase { transportId, duplexConnection); }}); - Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, transportPropertyManager, - random, clock); - - p.eventOccurred(new ConnectionClosedEvent(contactId, transportId, + poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId, false)); } @@ -225,11 +227,7 @@ public class PollerTest extends BrambleMockTestCase { will(returnValue(future)); }}); - Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, transportPropertyManager, - random, clock); - - p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, false)); } @@ -269,13 +267,9 @@ public class PollerTest extends BrambleMockTestCase { will(returnValue(now + 1)); }}); - Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, transportPropertyManager, - random, clock); - - p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, false)); - p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, false)); } @@ -318,13 +312,9 @@ public class PollerTest extends BrambleMockTestCase { with((long) pollingInterval - 2), with(MILLISECONDS)); }}); - Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, transportPropertyManager, - random, clock); - - p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, false)); - p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, false)); } @@ -367,11 +357,7 @@ public class PollerTest extends BrambleMockTestCase { oneOf(plugin).poll(singletonMap(contactId, properties)); }}); - Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, transportPropertyManager, - random, clock); - - p.eventOccurred(new TransportEnabledEvent(transportId)); + poller.eventOccurred(new TransportEnabledEvent(transportId)); } @Test @@ -412,11 +398,7 @@ public class PollerTest extends BrambleMockTestCase { // All contacts are connected, so don't poll the plugin }}); - Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, transportPropertyManager, - random, clock); - - p.eventOccurred(new TransportEnabledEvent(transportId)); + poller.eventOccurred(new TransportEnabledEvent(transportId)); } @Test @@ -442,11 +424,7 @@ public class PollerTest extends BrambleMockTestCase { oneOf(future).cancel(false); }}); - Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, transportPropertyManager, - random, clock); - - p.eventOccurred(new TransportEnabledEvent(transportId)); - p.eventOccurred(new TransportDisabledEvent(transportId)); + poller.eventOccurred(new TransportEnabledEvent(transportId)); + poller.eventOccurred(new TransportDisabledEvent(transportId)); } }