Connect to newly activated contacts.

This commit is contained in:
akwizgran
2016-04-06 11:06:08 +01:00
parent 04a1f2b12c
commit befd916eba
4 changed files with 229 additions and 6 deletions

View File

@@ -16,5 +16,7 @@ public interface ConnectionRegistry {
Collection<ContactId> getConnectedContacts(TransportId t);
boolean isConnected(ContactId c, TransportId t);
boolean isConnected(ContactId c);
}

View File

@@ -114,6 +114,16 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
}
public boolean isConnected(ContactId c, TransportId t) {
lock.lock();
try {
Map<ContactId, Integer> m = connections.get(t);
return m != null && m.containsKey(c);
} finally {
lock.unlock();
}
}
public boolean isConnected(ContactId c) {
lock.lock();
try {

View File

@@ -3,13 +3,17 @@ package org.briarproject.plugins;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.TransportDisabledEvent;
import org.briarproject.api.event.TransportEnabledEvent;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.lifecycle.Service;
import org.briarproject.api.lifecycle.ServiceException;
import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginCallback;
import org.briarproject.api.plugins.PluginConfig;
@@ -46,7 +50,7 @@ import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
class PluginManagerImpl implements PluginManager, Service {
class PluginManagerImpl implements PluginManager, Service, EventListener {
private static final Logger LOG =
Logger.getLogger(PluginManagerImpl.class.getName());
@@ -56,6 +60,7 @@ class PluginManagerImpl implements PluginManager, Service {
private final PluginConfig pluginConfig;
private final Poller poller;
private final ConnectionManager connectionManager;
private final ConnectionRegistry connectionRegistry;
private final SettingsManager settingsManager;
private final TransportPropertyManager transportPropertyManager;
private final UiCallback uiCallback;
@@ -67,6 +72,7 @@ class PluginManagerImpl implements PluginManager, Service {
PluginManagerImpl(@IoExecutor Executor ioExecutor, EventBus eventBus,
PluginConfig pluginConfig, Poller poller,
ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry,
SettingsManager settingsManager,
TransportPropertyManager transportPropertyManager,
UiCallback uiCallback) {
@@ -75,6 +81,7 @@ class PluginManagerImpl implements PluginManager, Service {
this.pluginConfig = pluginConfig;
this.poller = poller;
this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry;
this.settingsManager = settingsManager;
this.transportPropertyManager = transportPropertyManager;
this.uiCallback = uiCallback;
@@ -106,10 +113,14 @@ class PluginManagerImpl implements PluginManager, Service {
} catch (InterruptedException e) {
throw new ServiceException(e);
}
// Listen for events
eventBus.addListener(this);
}
@Override
public void stopService() throws ServiceException {
// Stop listening for events
eventBus.removeListener(this);
// Stop the poller
LOG.info("Stopping poller");
poller.stop();
@@ -122,9 +133,6 @@ class PluginManagerImpl implements PluginManager, Service {
LOG.info("Stopping duplex plugins");
for (DuplexPlugin plugin : duplexPlugins)
ioExecutor.execute(new PluginStopper(plugin, latch));
plugins.clear();
simplexPlugins.clear();
duplexPlugins.clear();
// Wait for all the plugins to stop
try {
latch.await();
@@ -151,6 +159,47 @@ class PluginManagerImpl implements PluginManager, Service {
return Collections.unmodifiableList(supported);
}
@Override
public void eventOccurred(Event e) {
if (e instanceof ContactStatusChangedEvent) {
ContactStatusChangedEvent c = (ContactStatusChangedEvent) e;
if (c.isActive()) connectToContact(c.getContactId());
}
}
private void connectToContact(ContactId c) {
for (SimplexPlugin s : simplexPlugins)
if (s.shouldPoll()) connectToContact(c, s);
for (DuplexPlugin d : duplexPlugins)
if (d.shouldPoll()) connectToContact(c, d);
}
private void connectToContact(final ContactId c, final SimplexPlugin p) {
ioExecutor.execute(new Runnable() {
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
TransportConnectionWriter w = p.createWriter(c);
if (w != null)
connectionManager.manageOutgoingConnection(c, t, w);
}
}
});
}
private void connectToContact(final ContactId c, final DuplexPlugin p) {
ioExecutor.execute(new Runnable() {
public void run() {
TransportId t = p.getId();
if (!connectionRegistry.isConnected(c, t)) {
DuplexTransportConnection d = p.createConnection(c);
if (d != null)
connectionManager.manageOutgoingConnection(c, t, d);
}
}
});
}
private class SimplexPluginStarter implements Runnable {
private final SimplexPluginFactory factory;

View File

@@ -1,13 +1,20 @@
package org.briarproject.plugins;
import org.briarproject.BriarTestCase;
import org.briarproject.ImmediateExecutor;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.PluginConfig;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.plugins.simplex.SimplexPlugin;
import org.briarproject.api.plugins.simplex.SimplexPluginCallback;
import org.briarproject.api.plugins.simplex.SimplexPluginFactory;
@@ -36,6 +43,8 @@ public class PluginManagerImplTest extends BriarTestCase {
final Poller poller = context.mock(Poller.class);
final ConnectionManager connectionManager =
context.mock(ConnectionManager.class);
final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
final SettingsManager settingsManager =
context.mock(SettingsManager.class);
final TransportPropertyManager transportPropertyManager =
@@ -63,6 +72,7 @@ public class PluginManagerImplTest extends BriarTestCase {
final TransportId duplexFailId = new TransportId("duplex1");
context.checking(new Expectations() {{
// start()
// First simplex plugin
oneOf(pluginConfig).getSimplexFactories();
will(returnValue(Arrays.asList(simplexFactory,
@@ -103,6 +113,11 @@ public class PluginManagerImplTest extends BriarTestCase {
oneOf(duplexFailFactory).createPlugin(with(any(
DuplexPluginCallback.class)));
will(returnValue(null)); // Failed to create a plugin
// Start listening for events
oneOf(eventBus).addListener(with(any(EventListener.class)));
// stop()
// Stop listening for events
oneOf(eventBus).removeListener(with(any(EventListener.class)));
// Stop the poller
oneOf(poller).stop();
// Stop the plugins
@@ -111,8 +126,8 @@ public class PluginManagerImplTest extends BriarTestCase {
}});
PluginManagerImpl p = new PluginManagerImpl(ioExecutor, eventBus,
pluginConfig, poller, connectionManager, settingsManager,
transportPropertyManager, uiCallback);
pluginConfig, poller, connectionManager, connectionRegistry,
settingsManager, transportPropertyManager, uiCallback);
// Two plugins should be started and stopped
p.startService();
@@ -120,4 +135,151 @@ public class PluginManagerImplTest extends BriarTestCase {
context.assertIsSatisfied();
}
@Test
public void testConnectToNewContact() throws Exception {
Mockery context = new Mockery();
final Executor ioExecutor = new ImmediateExecutor();
final EventBus eventBus = context.mock(EventBus.class);
final PluginConfig pluginConfig = context.mock(PluginConfig.class);
final Poller poller = context.mock(Poller.class);
final ConnectionManager connectionManager =
context.mock(ConnectionManager.class);
final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
final SettingsManager settingsManager =
context.mock(SettingsManager.class);
final TransportPropertyManager transportPropertyManager =
context.mock(TransportPropertyManager.class);
final UiCallback uiCallback = context.mock(UiCallback.class);
final TransportConnectionWriter transportConnectionWriter =
context.mock(TransportConnectionWriter.class);
final DuplexTransportConnection duplexTransportConnection =
context.mock(DuplexTransportConnection.class);
final ContactId contactId = new ContactId(234);
// Two simplex plugins: one supports polling, the other doesn't
final SimplexPluginFactory simplexFactory =
context.mock(SimplexPluginFactory.class);
final SimplexPlugin simplexPlugin = context.mock(SimplexPlugin.class);
final TransportId simplexId = new TransportId("simplex");
final SimplexPluginFactory simplexFactory1 =
context.mock(SimplexPluginFactory.class, "simplexFactory1");
final SimplexPlugin simplexPlugin1 =
context.mock(SimplexPlugin.class, "simplexPlugin1");
final TransportId simplexId1 = new TransportId("simplex1");
// Two duplex plugins: one supports polling, the other doesn't
final DuplexPluginFactory duplexFactory =
context.mock(DuplexPluginFactory.class);
final DuplexPlugin duplexPlugin = context.mock(DuplexPlugin.class);
final TransportId duplexId = new TransportId("duplex");
final DuplexPluginFactory duplexFactory1 =
context.mock(DuplexPluginFactory.class, "duplexFactory1");
final DuplexPlugin duplexPlugin1 =
context.mock(DuplexPlugin.class, "duplexPlugin1");
final TransportId duplexId1 = new TransportId("duplex1");
context.checking(new Expectations() {{
// start()
// First simplex plugin
oneOf(pluginConfig).getSimplexFactories();
will(returnValue(Arrays.asList(simplexFactory, simplexFactory1)));
oneOf(simplexFactory).getId();
will(returnValue(simplexId));
oneOf(simplexFactory).createPlugin(with(any(
SimplexPluginCallback.class)));
will(returnValue(simplexPlugin)); // Created
oneOf(simplexPlugin).start();
will(returnValue(true)); // Started
oneOf(simplexPlugin).shouldPoll();
will(returnValue(true)); // Should poll
oneOf(poller).addPlugin(simplexPlugin);
// Second simplex plugin
oneOf(simplexFactory1).getId();
will(returnValue(simplexId1));
oneOf(simplexFactory1).createPlugin(with(any(
SimplexPluginCallback.class)));
will(returnValue(simplexPlugin1)); // Created
oneOf(simplexPlugin1).start();
will(returnValue(true)); // Started
oneOf(simplexPlugin1).shouldPoll();
will(returnValue(false)); // Should not poll
// First duplex plugin
oneOf(pluginConfig).getDuplexFactories();
will(returnValue(Arrays.asList(duplexFactory, duplexFactory1)));
oneOf(duplexFactory).getId();
will(returnValue(duplexId));
oneOf(duplexFactory).createPlugin(with(any(
DuplexPluginCallback.class)));
will(returnValue(duplexPlugin)); // Created
oneOf(duplexPlugin).start();
will(returnValue(true)); // Started
oneOf(duplexPlugin).shouldPoll();
will(returnValue(true)); // Should poll
oneOf(poller).addPlugin(duplexPlugin);
// Second duplex plugin
oneOf(duplexFactory1).getId();
will(returnValue(duplexId1));
oneOf(duplexFactory1).createPlugin(with(any(
DuplexPluginCallback.class)));
will(returnValue(duplexPlugin1)); // Created
oneOf(duplexPlugin1).start();
will(returnValue(true)); // Started
oneOf(duplexPlugin1).shouldPoll();
will(returnValue(false)); // Should not poll
// Start listening for events
oneOf(eventBus).addListener(with(any(EventListener.class)));
// eventOccurred()
// First simplex plugin
oneOf(simplexPlugin).shouldPoll();
will(returnValue(true));
oneOf(simplexPlugin).getId();
will(returnValue(simplexId));
oneOf(connectionRegistry).isConnected(contactId, simplexId);
will(returnValue(false));
oneOf(simplexPlugin).createWriter(contactId);
will(returnValue(transportConnectionWriter));
oneOf(connectionManager).manageOutgoingConnection(contactId,
simplexId, transportConnectionWriter);
// Second simplex plugin
oneOf(simplexPlugin1).shouldPoll();
will(returnValue(false));
// First duplex plugin
oneOf(duplexPlugin).shouldPoll();
will(returnValue(true));
oneOf(duplexPlugin).getId();
will(returnValue(duplexId));
oneOf(connectionRegistry).isConnected(contactId, duplexId);
will(returnValue(false));
oneOf(duplexPlugin).createConnection(contactId);
will(returnValue(duplexTransportConnection));
oneOf(connectionManager).manageOutgoingConnection(contactId,
duplexId, duplexTransportConnection);
// Second duplex plugin
oneOf(duplexPlugin1).shouldPoll();
will(returnValue(false));
// stop()
// Stop listening for events
oneOf(eventBus).removeListener(with(any(EventListener.class)));
// Stop the poller
oneOf(poller).stop();
// Stop the plugins
oneOf(simplexPlugin).stop();
oneOf(simplexPlugin1).stop();
oneOf(duplexPlugin).stop();
oneOf(duplexPlugin1).stop();
}});
PluginManagerImpl p = new PluginManagerImpl(ioExecutor, eventBus,
pluginConfig, poller, connectionManager, connectionRegistry,
settingsManager, transportPropertyManager, uiCallback);
p.startService();
p.eventOccurred(new ContactStatusChangedEvent(contactId, true));
p.stopService();
context.assertIsSatisfied();
}
}