Moved all automatic connection logic into poller.

This commit is contained in:
akwizgran
2016-05-05 14:48:28 +01:00
parent ff8301521c
commit 5044f34ba9
9 changed files with 411 additions and 341 deletions

View File

@@ -2,24 +2,39 @@ package org.briarproject.api.plugins;
import org.briarproject.api.TransportId; import org.briarproject.api.TransportId;
import org.briarproject.api.plugins.duplex.DuplexPlugin; import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.simplex.SimplexPlugin;
import java.util.Collection; import java.util.Collection;
/** /**
* Responsible for starting transport plugins at startup, stopping them at * Responsible for starting transport plugins at startup and stopping them at
* shutdown, and providing access to plugins for exchanging invitations. * shutdown.
*/ */
public interface PluginManager { public interface PluginManager {
/** /**
* Returns the plugin for the given transport, or null if no such plugin * Returns the plugin for the given transport, or null if no such plugin
* is running. * has been created.
*/ */
Plugin getPlugin(TransportId t); Plugin getPlugin(TransportId t);
/** Returns any running duplex plugins that support invitations. */ /**
* Returns any simplex plugins that have been created.
*/
Collection<SimplexPlugin> getSimplexPlugins();
/**
* Returns any duplex plugins that have been created.
*/
Collection<DuplexPlugin> getDuplexPlugins();
/**
* Returns any duplex plugins that support invitations.
*/
Collection<DuplexPlugin> getInvitationPlugins(); Collection<DuplexPlugin> getInvitationPlugins();
/** Returns any running duplex plugins that support key agreement. */ /**
* Returns any duplex plugins that support key agreement.
*/
Collection<DuplexPlugin> getKeyAgreementPlugins(); Collection<DuplexPlugin> getKeyAgreementPlugins();
} }

View File

@@ -3,18 +3,13 @@ package org.briarproject.plugins;
import org.briarproject.api.TransportId; import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId; import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DbException; import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus; import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.TransportDisabledEvent; import org.briarproject.api.event.TransportDisabledEvent;
import org.briarproject.api.event.TransportEnabledEvent; import org.briarproject.api.event.TransportEnabledEvent;
import org.briarproject.api.lifecycle.IoExecutor; import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.lifecycle.Service; import org.briarproject.api.lifecycle.Service;
import org.briarproject.api.lifecycle.ServiceException; import org.briarproject.api.lifecycle.ServiceException;
import org.briarproject.api.plugins.ConnectionManager; import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin; import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginCallback; import org.briarproject.api.plugins.PluginCallback;
import org.briarproject.api.plugins.PluginConfig; import org.briarproject.api.plugins.PluginConfig;
@@ -51,7 +46,7 @@ import javax.inject.Inject;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
class PluginManagerImpl implements PluginManager, Service, EventListener { class PluginManagerImpl implements PluginManager, Service {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(PluginManagerImpl.class.getName()); Logger.getLogger(PluginManagerImpl.class.getName());
@@ -59,9 +54,7 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
private final Executor ioExecutor; private final Executor ioExecutor;
private final EventBus eventBus; private final EventBus eventBus;
private final PluginConfig pluginConfig; private final PluginConfig pluginConfig;
private final Poller poller;
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final ConnectionRegistry connectionRegistry;
private final SettingsManager settingsManager; private final SettingsManager settingsManager;
private final TransportPropertyManager transportPropertyManager; private final TransportPropertyManager transportPropertyManager;
private final UiCallback uiCallback; private final UiCallback uiCallback;
@@ -71,18 +64,14 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
@Inject @Inject
PluginManagerImpl(@IoExecutor Executor ioExecutor, EventBus eventBus, PluginManagerImpl(@IoExecutor Executor ioExecutor, EventBus eventBus,
PluginConfig pluginConfig, Poller poller, PluginConfig pluginConfig, ConnectionManager connectionManager,
ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry,
SettingsManager settingsManager, SettingsManager settingsManager,
TransportPropertyManager transportPropertyManager, TransportPropertyManager transportPropertyManager,
UiCallback uiCallback) { UiCallback uiCallback) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.pluginConfig = pluginConfig; this.pluginConfig = pluginConfig;
this.poller = poller;
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry;
this.settingsManager = settingsManager; this.settingsManager = settingsManager;
this.transportPropertyManager = transportPropertyManager; this.transportPropertyManager = transportPropertyManager;
this.uiCallback = uiCallback; this.uiCallback = uiCallback;
@@ -93,36 +82,55 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
@Override @Override
public void startService() throws ServiceException { public void startService() throws ServiceException {
Collection<SimplexPluginFactory> simplexFactories =
pluginConfig.getSimplexFactories();
Collection<DuplexPluginFactory> duplexFactories =
pluginConfig.getDuplexFactories();
int numPlugins = simplexFactories.size() + duplexFactories.size();
CountDownLatch latch = new CountDownLatch(numPlugins);
// Instantiate and start the simplex plugins // Instantiate and start the simplex plugins
LOG.info("Starting simplex plugins"); LOG.info("Starting simplex plugins");
Collection<SimplexPluginFactory> sFactories = for (SimplexPluginFactory f : simplexFactories) {
pluginConfig.getSimplexFactories(); TransportId t = f.getId();
final CountDownLatch sLatch = new CountDownLatch(sFactories.size()); SimplexPluginCallback c = new SimplexCallback(t);
for (SimplexPluginFactory factory : sFactories) SimplexPlugin s = f.createPlugin(c);
ioExecutor.execute(new SimplexPluginStarter(factory, sLatch)); if (s == null) {
if (LOG.isLoggable(WARNING))
LOG.warning("Could not create plugin for " + t);
latch.countDown();
} else {
plugins.put(t, s);
simplexPlugins.add(s);
ioExecutor.execute(new PluginStarter(s, latch));
}
}
// Instantiate and start the duplex plugins // Instantiate and start the duplex plugins
LOG.info("Starting duplex plugins"); LOG.info("Starting duplex plugins");
Collection<DuplexPluginFactory> dFactories = for (DuplexPluginFactory f : duplexFactories) {
pluginConfig.getDuplexFactories(); TransportId t = f.getId();
final CountDownLatch dLatch = new CountDownLatch(dFactories.size()); DuplexPluginCallback c = new DuplexCallback(t);
for (DuplexPluginFactory factory : dFactories) DuplexPlugin d = f.createPlugin(c);
ioExecutor.execute(new DuplexPluginStarter(factory, dLatch)); if (d == null) {
// Wait for the plugins to start if (LOG.isLoggable(WARNING))
LOG.warning("Could not create plugin for " + t);
latch.countDown();
} else {
plugins.put(t, d);
duplexPlugins.add(d);
ioExecutor.execute(new PluginStarter(d, latch));
}
}
// Wait for all the plugins to start
try { try {
sLatch.await(); latch.await();
dLatch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
// Listen for events
eventBus.addListener(this);
} }
@Override @Override
public void stopService() throws ServiceException { public void stopService() throws ServiceException {
// Stop listening for events CountDownLatch latch = new CountDownLatch(plugins.size());
eventBus.removeListener(this);
final CountDownLatch latch = new CountDownLatch(plugins.size());
// Stop the simplex plugins // Stop the simplex plugins
LOG.info("Stopping simplex plugins"); LOG.info("Stopping simplex plugins");
for (SimplexPlugin plugin : simplexPlugins) for (SimplexPlugin plugin : simplexPlugins)
@@ -144,6 +152,18 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
return plugins.get(t); return plugins.get(t);
} }
@Override
public Collection<SimplexPlugin> getSimplexPlugins() {
List<SimplexPlugin> copy = new ArrayList<SimplexPlugin>(simplexPlugins);
return Collections.unmodifiableList(copy);
}
@Override
public Collection<DuplexPlugin> getDuplexPlugins() {
List<DuplexPlugin> copy = new ArrayList<DuplexPlugin>(duplexPlugins);
return Collections.unmodifiableList(copy);
}
@Override @Override
public Collection<DuplexPlugin> getInvitationPlugins() { public Collection<DuplexPlugin> getInvitationPlugins() {
List<DuplexPlugin> supported = new ArrayList<DuplexPlugin>(); List<DuplexPlugin> supported = new ArrayList<DuplexPlugin>();
@@ -160,149 +180,24 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
return Collections.unmodifiableList(supported); return Collections.unmodifiableList(supported);
} }
@Override private class PluginStarter implements Runnable {
public void eventOccurred(Event e) {
if (e instanceof ContactStatusChangedEvent) {
ContactStatusChangedEvent c = (ContactStatusChangedEvent) e;
if (c.isActive()) {
// Connect to the newly activated contact
connectToContact(c.getContactId());
}
} else if (e instanceof ConnectionClosedEvent) {
ConnectionClosedEvent c = (ConnectionClosedEvent) e;
if (!c.isIncoming()) {
// Connect to the disconnected contact
connectToContact(c.getContactId(), c.getTransportId());
}
}
}
private void connectToContact(ContactId c) { private final Plugin plugin;
for (SimplexPlugin s : simplexPlugins)
if (s.shouldPoll()) connectToContact(c, s);
for (DuplexPlugin d : duplexPlugins)
if (d.shouldPoll()) connectToContact(c, d);
}
private void connectToContact(ContactId c, TransportId t) {
Plugin p = plugins.get(t);
if (p instanceof SimplexPlugin && p.shouldPoll())
connectToContact(c, (SimplexPlugin) p);
else if (p instanceof DuplexPlugin && p.shouldPoll())
connectToContact(c, (DuplexPlugin) p);
}
private void connectToContact(final ContactId c, final SimplexPlugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
TransportConnectionWriter w = p.createWriter(c);
if (w != null)
connectionManager.manageOutgoingConnection(c, t, w);
}
}
});
}
private void connectToContact(final ContactId c, final DuplexPlugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
DuplexTransportConnection d = p.createConnection(c);
if (d != null)
connectionManager.manageOutgoingConnection(c, t, d);
}
}
});
}
private class SimplexPluginStarter implements Runnable {
private final SimplexPluginFactory factory;
private final CountDownLatch latch; private final CountDownLatch latch;
private SimplexPluginStarter(SimplexPluginFactory factory, private PluginStarter(Plugin plugin, CountDownLatch latch) {
CountDownLatch latch) { this.plugin = plugin;
this.factory = factory;
this.latch = latch; this.latch = latch;
} }
@Override @Override
public void run() { public void run() {
try { try {
TransportId id = factory.getId();
SimplexCallback callback = new SimplexCallback(id);
SimplexPlugin plugin = factory.createPlugin(callback);
if (plugin == null) {
if (LOG.isLoggable(INFO)) {
String name = factory.getClass().getSimpleName();
LOG.info(name + " did not create a plugin");
}
return;
}
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
boolean started = plugin.start(); boolean started = plugin.start();
long duration = System.currentTimeMillis() - start; long duration = System.currentTimeMillis() - start;
if (started) { if (started) {
plugins.put(id, plugin);
simplexPlugins.add(plugin);
if (LOG.isLoggable(INFO)) {
String name = plugin.getClass().getSimpleName();
LOG.info("Starting " + name + " took " +
duration + " ms");
}
} else {
if (LOG.isLoggable(WARNING)) {
String name = plugin.getClass().getSimpleName();
LOG.warning(name + " did not start");
}
}
} catch (IOException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
} finally {
latch.countDown();
}
}
}
private class DuplexPluginStarter implements Runnable {
private final DuplexPluginFactory factory;
private final CountDownLatch latch;
private DuplexPluginStarter(DuplexPluginFactory factory,
CountDownLatch latch) {
this.factory = factory;
this.latch = latch;
}
@Override
public void run() {
try {
TransportId id = factory.getId();
DuplexCallback callback = new DuplexCallback(id);
DuplexPlugin plugin = factory.createPlugin(callback);
if (plugin == null) {
if (LOG.isLoggable(INFO)) {
String name = factory.getClass().getSimpleName();
LOG.info(name + " did not create a plugin");
}
return;
}
try {
long start = System.currentTimeMillis();
boolean started = plugin.start();
long duration = System.currentTimeMillis() - start;
if (started) {
plugins.put(id, plugin);
duplexPlugins.add(plugin);
if (LOG.isLoggable(INFO)) { if (LOG.isLoggable(INFO)) {
String name = plugin.getClass().getSimpleName(); String name = plugin.getClass().getSimpleName();
LOG.info("Starting " + name + " took " + LOG.info("Starting " + name + " took " +
@@ -428,8 +323,6 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
@Override @Override
public void transportEnabled() { public void transportEnabled() {
eventBus.broadcast(new TransportEnabledEvent(id)); eventBus.broadcast(new TransportEnabledEvent(id));
Plugin p = plugins.get(id);
if (p != null && p.shouldPoll()) poller.pollNow(p);
} }
@Override @Override

View File

@@ -26,6 +26,8 @@ public class PluginsModule {
public static class EagerSingletons { public static class EagerSingletons {
@Inject @Inject
PluginManager pluginManager; PluginManager pluginManager;
@Inject
Poller poller;
} }
@Provides @Provides
@@ -35,7 +37,8 @@ public class PluginsModule {
@Provides @Provides
@Singleton @Singleton
Poller providePoller(PollerImpl poller) { Poller providePoller(EventBus eventBus, PollerImpl poller) {
eventBus.addListener(poller);
return poller; return poller;
} }

View File

@@ -1,9 +1,6 @@
package org.briarproject.plugins; package org.briarproject.plugins;
import org.briarproject.api.plugins.Plugin;
interface Poller { interface Poller {
/** Tells the poller to poll the given plugin immediately. */ // TODO: Remove this interface
void pollNow(Plugin p);
} }

View File

@@ -1,9 +1,21 @@
package org.briarproject.plugins; package org.briarproject.plugins;
import org.briarproject.api.TransportId; import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.TransportEnabledEvent;
import org.briarproject.api.lifecycle.IoExecutor; import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry; import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin; import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.plugins.simplex.SimplexPlugin;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.Map; import java.util.Map;
@@ -17,30 +29,100 @@ import javax.inject.Inject;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
class PollerImpl implements Poller { class PollerImpl implements Poller, EventListener {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(PollerImpl.class.getName()); Logger.getLogger(PollerImpl.class.getName());
private final Executor ioExecutor; private final Executor ioExecutor;
private final ScheduledExecutorService scheduler; private final ScheduledExecutorService scheduler;
private final ConnectionManager connectionManager;
private final ConnectionRegistry connectionRegistry; private final ConnectionRegistry connectionRegistry;
private final PluginManager pluginManager;
private final SecureRandom random; private final SecureRandom random;
private final Map<TransportId, PollTask> tasks; private final Map<TransportId, PollTask> tasks;
@Inject @Inject
PollerImpl(@IoExecutor Executor ioExecutor, PollerImpl(@IoExecutor Executor ioExecutor,
ScheduledExecutorService scheduler, ScheduledExecutorService scheduler,
ConnectionRegistry connectionRegistry, SecureRandom random) { ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry, PluginManager pluginManager,
SecureRandom random) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry; this.connectionRegistry = connectionRegistry;
this.pluginManager = pluginManager;
this.random = random; this.random = random;
this.scheduler = scheduler; this.scheduler = scheduler;
tasks = new ConcurrentHashMap<TransportId, PollTask>(); tasks = new ConcurrentHashMap<TransportId, PollTask>();
} }
@Override @Override
public void pollNow(Plugin p) { public void eventOccurred(Event e) {
if (e instanceof ContactStatusChangedEvent) {
ContactStatusChangedEvent c = (ContactStatusChangedEvent) e;
if (c.isActive()) {
// Connect to the newly activated contact
connectToContact(c.getContactId());
}
} else if (e instanceof ConnectionClosedEvent) {
ConnectionClosedEvent c = (ConnectionClosedEvent) e;
if (!c.isIncoming()) {
// Connect to the disconnected contact
connectToContact(c.getContactId(), c.getTransportId());
}
} else if (e instanceof TransportEnabledEvent) {
TransportEnabledEvent t = (TransportEnabledEvent) e;
Plugin p = pluginManager.getPlugin(t.getTransportId());
if (p.shouldPoll()) pollNow(p);
}
}
private void connectToContact(ContactId c) {
for (SimplexPlugin s : pluginManager.getSimplexPlugins())
if (s.shouldPoll()) connectToContact(c, s);
for (DuplexPlugin d : pluginManager.getDuplexPlugins())
if (d.shouldPoll()) connectToContact(c, d);
}
private void connectToContact(ContactId c, TransportId t) {
Plugin p = pluginManager.getPlugin(t);
if (p instanceof SimplexPlugin && p.shouldPoll())
connectToContact(c, (SimplexPlugin) p);
else if (p instanceof DuplexPlugin && p.shouldPoll())
connectToContact(c, (DuplexPlugin) p);
}
private void connectToContact(final ContactId c, final SimplexPlugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
TransportConnectionWriter w = p.createWriter(c);
if (w != null)
connectionManager.manageOutgoingConnection(c, t, w);
}
}
});
}
private void connectToContact(final ContactId c, final DuplexPlugin p) {
ioExecutor.execute(new Runnable() {
@Override
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
DuplexTransportConnection d = p.createConnection(c);
if (d != null)
connectionManager.manageOutgoingConnection(c, t, d);
}
}
});
}
private void pollNow(Plugin p) {
// Randomise next polling interval // Randomise next polling interval
schedule(p, 0, true); schedule(p, 0, true);
} }

View File

@@ -0,0 +1,20 @@
package org.briarproject;
import org.hamcrest.Description;
import org.jmock.api.Action;
import org.jmock.api.Invocation;
public class RunAction implements Action {
@Override
public Object invoke(Invocation invocation) throws Throwable {
Runnable task = (Runnable) invocation.getParameter(0);
task.run();
return null;
}
@Override
public void describeTo(Description description) {
description.appendText("runs a runnable");
}
}

View File

@@ -1,20 +1,13 @@
package org.briarproject.plugins; package org.briarproject.plugins;
import org.briarproject.BriarTestCase; import org.briarproject.BriarTestCase;
import org.briarproject.ImmediateExecutor;
import org.briarproject.api.TransportId; import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.EventBus; import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.plugins.ConnectionManager; import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.PluginConfig; import org.briarproject.api.plugins.PluginConfig;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexPlugin; import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback; import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexPluginFactory; import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.plugins.simplex.SimplexPlugin; import org.briarproject.api.plugins.simplex.SimplexPlugin;
import org.briarproject.api.plugins.simplex.SimplexPluginCallback; import org.briarproject.api.plugins.simplex.SimplexPluginCallback;
import org.briarproject.api.plugins.simplex.SimplexPluginFactory; import org.briarproject.api.plugins.simplex.SimplexPluginFactory;
@@ -40,11 +33,8 @@ public class PluginManagerImplTest extends BriarTestCase {
final Executor ioExecutor = Executors.newSingleThreadExecutor(); final Executor ioExecutor = Executors.newSingleThreadExecutor();
final EventBus eventBus = context.mock(EventBus.class); final EventBus eventBus = context.mock(EventBus.class);
final PluginConfig pluginConfig = context.mock(PluginConfig.class); final PluginConfig pluginConfig = context.mock(PluginConfig.class);
final Poller poller = context.mock(Poller.class);
final ConnectionManager connectionManager = final ConnectionManager connectionManager =
context.mock(ConnectionManager.class); context.mock(ConnectionManager.class);
final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
final SettingsManager settingsManager = final SettingsManager settingsManager =
context.mock(SettingsManager.class); context.mock(SettingsManager.class);
final TransportPropertyManager transportPropertyManager = final TransportPropertyManager transportPropertyManager =
@@ -108,19 +98,16 @@ public class PluginManagerImplTest extends BriarTestCase {
oneOf(duplexFailFactory).createPlugin(with(any( oneOf(duplexFailFactory).createPlugin(with(any(
DuplexPluginCallback.class))); DuplexPluginCallback.class)));
will(returnValue(null)); // Failed to create a plugin will(returnValue(null)); // Failed to create a plugin
// Start listening for events
oneOf(eventBus).addListener(with(any(EventListener.class)));
// stop() // stop()
// Stop listening for events
oneOf(eventBus).removeListener(with(any(EventListener.class)));
// Stop the plugins // Stop the plugins
oneOf(simplexPlugin).stop(); oneOf(simplexPlugin).stop();
oneOf(simplexFailPlugin).stop();
oneOf(duplexPlugin).stop(); oneOf(duplexPlugin).stop();
}}); }});
PluginManagerImpl p = new PluginManagerImpl(ioExecutor, eventBus, PluginManagerImpl p = new PluginManagerImpl(ioExecutor, eventBus,
pluginConfig, poller, connectionManager, connectionRegistry, pluginConfig, connectionManager, settingsManager,
settingsManager, transportPropertyManager, uiCallback); transportPropertyManager, uiCallback);
// Two plugins should be started and stopped // Two plugins should be started and stopped
p.startService(); p.startService();
@@ -128,139 +115,4 @@ public class PluginManagerImplTest extends BriarTestCase {
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
@Test
public void testConnectToNewContact() throws Exception {
Mockery context = new Mockery();
final Executor ioExecutor = new ImmediateExecutor();
final EventBus eventBus = context.mock(EventBus.class);
final PluginConfig pluginConfig = context.mock(PluginConfig.class);
final Poller poller = context.mock(Poller.class);
final ConnectionManager connectionManager =
context.mock(ConnectionManager.class);
final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
final SettingsManager settingsManager =
context.mock(SettingsManager.class);
final TransportPropertyManager transportPropertyManager =
context.mock(TransportPropertyManager.class);
final UiCallback uiCallback = context.mock(UiCallback.class);
final TransportConnectionWriter transportConnectionWriter =
context.mock(TransportConnectionWriter.class);
final DuplexTransportConnection duplexTransportConnection =
context.mock(DuplexTransportConnection.class);
final ContactId contactId = new ContactId(234);
// Two simplex plugins: one supports polling, the other doesn't
final SimplexPluginFactory simplexFactory =
context.mock(SimplexPluginFactory.class);
final SimplexPlugin simplexPlugin = context.mock(SimplexPlugin.class);
final TransportId simplexId = new TransportId("simplex");
final SimplexPluginFactory simplexFactory1 =
context.mock(SimplexPluginFactory.class, "simplexFactory1");
final SimplexPlugin simplexPlugin1 =
context.mock(SimplexPlugin.class, "simplexPlugin1");
final TransportId simplexId1 = new TransportId("simplex1");
// Two duplex plugins: one supports polling, the other doesn't
final DuplexPluginFactory duplexFactory =
context.mock(DuplexPluginFactory.class);
final DuplexPlugin duplexPlugin = context.mock(DuplexPlugin.class);
final TransportId duplexId = new TransportId("duplex");
final DuplexPluginFactory duplexFactory1 =
context.mock(DuplexPluginFactory.class, "duplexFactory1");
final DuplexPlugin duplexPlugin1 =
context.mock(DuplexPlugin.class, "duplexPlugin1");
final TransportId duplexId1 = new TransportId("duplex1");
context.checking(new Expectations() {{
// start()
// First simplex plugin
oneOf(pluginConfig).getSimplexFactories();
will(returnValue(Arrays.asList(simplexFactory, simplexFactory1)));
oneOf(simplexFactory).getId();
will(returnValue(simplexId));
oneOf(simplexFactory).createPlugin(with(any(
SimplexPluginCallback.class)));
will(returnValue(simplexPlugin)); // Created
oneOf(simplexPlugin).start();
will(returnValue(true)); // Started
// Second simplex plugin
oneOf(simplexFactory1).getId();
will(returnValue(simplexId1));
oneOf(simplexFactory1).createPlugin(with(any(
SimplexPluginCallback.class)));
will(returnValue(simplexPlugin1)); // Created
oneOf(simplexPlugin1).start();
will(returnValue(true)); // Started
// First duplex plugin
oneOf(pluginConfig).getDuplexFactories();
will(returnValue(Arrays.asList(duplexFactory, duplexFactory1)));
oneOf(duplexFactory).getId();
will(returnValue(duplexId));
oneOf(duplexFactory).createPlugin(with(any(
DuplexPluginCallback.class)));
will(returnValue(duplexPlugin)); // Created
oneOf(duplexPlugin).start();
will(returnValue(true)); // Started
// Second duplex plugin
oneOf(duplexFactory1).getId();
will(returnValue(duplexId1));
oneOf(duplexFactory1).createPlugin(with(any(
DuplexPluginCallback.class)));
will(returnValue(duplexPlugin1)); // Created
oneOf(duplexPlugin1).start();
will(returnValue(true)); // Started
// Start listening for events
oneOf(eventBus).addListener(with(any(EventListener.class)));
// eventOccurred()
// First simplex plugin
oneOf(simplexPlugin).shouldPoll();
will(returnValue(true));
oneOf(simplexPlugin).getId();
will(returnValue(simplexId));
oneOf(connectionRegistry).isConnected(contactId, simplexId);
will(returnValue(false));
oneOf(simplexPlugin).createWriter(contactId);
will(returnValue(transportConnectionWriter));
oneOf(connectionManager).manageOutgoingConnection(contactId,
simplexId, transportConnectionWriter);
// Second simplex plugin
oneOf(simplexPlugin1).shouldPoll();
will(returnValue(false));
// First duplex plugin
oneOf(duplexPlugin).shouldPoll();
will(returnValue(true));
oneOf(duplexPlugin).getId();
will(returnValue(duplexId));
oneOf(connectionRegistry).isConnected(contactId, duplexId);
will(returnValue(false));
oneOf(duplexPlugin).createConnection(contactId);
will(returnValue(duplexTransportConnection));
oneOf(connectionManager).manageOutgoingConnection(contactId,
duplexId, duplexTransportConnection);
// Second duplex plugin
oneOf(duplexPlugin1).shouldPoll();
will(returnValue(false));
// stop()
// Stop listening for events
oneOf(eventBus).removeListener(with(any(EventListener.class)));
// Stop the plugins
oneOf(simplexPlugin).stop();
oneOf(simplexPlugin1).stop();
oneOf(duplexPlugin).stop();
oneOf(duplexPlugin1).stop();
}});
PluginManagerImpl p = new PluginManagerImpl(ioExecutor, eventBus,
pluginConfig, poller, connectionManager, connectionRegistry,
settingsManager, transportPropertyManager, uiCallback);
p.startService();
p.eventOccurred(new ContactStatusChangedEvent(contactId, true));
p.stopService();
context.assertIsSatisfied();
}
} }

View File

@@ -0,0 +1,222 @@
package org.briarproject.plugins;
import org.briarproject.BriarTestCase;
import org.briarproject.ImmediateExecutor;
import org.briarproject.RunAction;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.TransportEnabledEvent;
import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.plugins.simplex.SimplexPlugin;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.lib.legacy.ClassImposteriser;
import org.junit.Test;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class PollerImplTest extends BriarTestCase {
private final ContactId contactId = new ContactId(234);
@Test
public void testConnectToNewContact() throws Exception {
Mockery context = new Mockery();
context.setImposteriser(ClassImposteriser.INSTANCE);
final Executor ioExecutor = new ImmediateExecutor();
final ScheduledExecutorService scheduler =
context.mock(ScheduledExecutorService.class);
final ConnectionManager connectionManager =
context.mock(ConnectionManager.class);
final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
final PluginManager pluginManager = context.mock(PluginManager.class);
final SecureRandom random = context.mock(SecureRandom.class);
// Two simplex plugins: one supports polling, the other doesn't
final SimplexPlugin simplexPlugin = context.mock(SimplexPlugin.class);
final SimplexPlugin simplexPlugin1 =
context.mock(SimplexPlugin.class, "simplexPlugin1");
final TransportId simplexId1 = new TransportId("simplex1");
final List<SimplexPlugin> simplexPlugins = Arrays.asList(simplexPlugin,
simplexPlugin1);
final TransportConnectionWriter simplexWriter =
context.mock(TransportConnectionWriter.class);
// Two duplex plugins: one supports polling, the other doesn't
final DuplexPlugin duplexPlugin = context.mock(DuplexPlugin.class);
final TransportId duplexId = new TransportId("duplex");
final DuplexPlugin duplexPlugin1 =
context.mock(DuplexPlugin.class, "duplexPlugin1");
final List<DuplexPlugin> duplexPlugins = Arrays.asList(duplexPlugin,
duplexPlugin1);
final DuplexTransportConnection duplexConnection =
context.mock(DuplexTransportConnection.class);
context.checking(new Expectations() {{
// Get the simplex plugins
oneOf(pluginManager).getSimplexPlugins();
will(returnValue(simplexPlugins));
// The first plugin doesn't support polling
oneOf(simplexPlugin).shouldPoll();
will(returnValue(false));
// The second plugin supports polling
oneOf(simplexPlugin1).shouldPoll();
will(returnValue(true));
// Check whether the contact is already connected
oneOf(simplexPlugin1).getId();
will(returnValue(simplexId1));
oneOf(connectionRegistry).isConnected(contactId, simplexId1);
will(returnValue(false));
// Connect to the contact
oneOf(simplexPlugin1).createWriter(contactId);
will(returnValue(simplexWriter));
// Pass the connection to the connection manager
oneOf(connectionManager).manageOutgoingConnection(contactId,
simplexId1, simplexWriter);
// Get the duplex plugins
oneOf(pluginManager).getDuplexPlugins();
will(returnValue(duplexPlugins));
// The first plugin supports polling
oneOf(duplexPlugin).shouldPoll();
will(returnValue(true));
// Check whether the contact is already connected
oneOf(duplexPlugin).getId();
will(returnValue(duplexId));
oneOf(connectionRegistry).isConnected(contactId, duplexId);
will(returnValue(false));
// Connect to the contact
oneOf(duplexPlugin).createConnection(contactId);
will(returnValue(duplexConnection));
// Pass the connection to the connection manager
oneOf(connectionManager).manageOutgoingConnection(contactId,
duplexId, duplexConnection);
// The second plugin doesn't support polling
oneOf(duplexPlugin1).shouldPoll();
will(returnValue(false));
}});
PollerImpl p = new PollerImpl(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random);
p.eventOccurred(new ContactStatusChangedEvent(contactId, true));
context.assertIsSatisfied();
}
@Test
public void testReconnectToDisconnectedContact() throws Exception {
Mockery context = new Mockery();
context.setImposteriser(ClassImposteriser.INSTANCE);
final Executor ioExecutor = new ImmediateExecutor();
final ScheduledExecutorService scheduler =
context.mock(ScheduledExecutorService.class);
final ConnectionManager connectionManager =
context.mock(ConnectionManager.class);
final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
final PluginManager pluginManager = context.mock(PluginManager.class);
final SecureRandom random = context.mock(SecureRandom.class);
final DuplexPlugin plugin = context.mock(DuplexPlugin.class);
final TransportId transportId = new TransportId("id");
final DuplexTransportConnection duplexConnection =
context.mock(DuplexTransportConnection.class);
context.checking(new Expectations() {{
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Check whether the contact is already connected
oneOf(plugin).getId();
will(returnValue(transportId));
oneOf(connectionRegistry).isConnected(contactId, transportId);
will(returnValue(false));
// Connect to the contact
oneOf(plugin).createConnection(contactId);
will(returnValue(duplexConnection));
// Pass the connection to the connection manager
oneOf(connectionManager).manageOutgoingConnection(contactId,
transportId, duplexConnection);
}});
PollerImpl p = new PollerImpl(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random);
p.eventOccurred(new ConnectionClosedEvent(contactId, transportId,
false));
context.assertIsSatisfied();
}
@Test
public void testPollWhenTransportIsEnabled() throws Exception {
Mockery context = new Mockery();
context.setImposteriser(ClassImposteriser.INSTANCE);
final Executor ioExecutor = new ImmediateExecutor();
final ScheduledExecutorService scheduler =
context.mock(ScheduledExecutorService.class);
final ConnectionManager connectionManager =
context.mock(ConnectionManager.class);
final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
final PluginManager pluginManager = context.mock(PluginManager.class);
final SecureRandom random = context.mock(SecureRandom.class);
final Plugin plugin = context.mock(Plugin.class);
final TransportId transportId = new TransportId("id");
final int pollingInterval = 60 * 1000;
final List<ContactId> connected = Collections.singletonList(contactId);
context.checking(new Expectations() {{
allowing(plugin).getId();
will(returnValue(transportId));
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Schedule a polling task immediately
oneOf(scheduler).schedule(with(any(Runnable.class)), with(0L),
with(MILLISECONDS));
will(new RunAction());
// Run the polling task
oneOf(plugin).getPollingInterval();
will(returnValue(pollingInterval));
oneOf(random).nextDouble();
will(returnValue(0.5));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) (pollingInterval * 0.5)), with(MILLISECONDS));
// Poll the plugin
oneOf(connectionRegistry).getConnectedContacts(transportId);
will(returnValue(connected));
oneOf(plugin).poll(connected);
}});
PollerImpl p = new PollerImpl(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random);
p.eventOccurred(new TransportEnabledEvent(transportId));
context.assertIsSatisfied();
}
}

View File

@@ -1,6 +1,7 @@
package org.briarproject.transport; package org.briarproject.transport;
import org.briarproject.BriarTestCase; import org.briarproject.BriarTestCase;
import org.briarproject.RunAction;
import org.briarproject.TestUtils; import org.briarproject.TestUtils;
import org.briarproject.api.TransportId; import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId; import org.briarproject.api.contact.ContactId;
@@ -501,19 +502,4 @@ public class TransportKeyManagerTest extends BriarTestCase {
description.appendText("encodes a tag"); description.appendText("encodes a tag");
} }
} }
private static class RunAction implements Action {
@Override
public Object invoke(Invocation invocation) throws Throwable {
Runnable task = (Runnable) invocation.getParameter(0);
task.run();
return null;
}
@Override
public void describeTo(Description description) {
description.appendText("runs a runnable");
}
}
} }