Merge branch '252-lan-polling' into 'master'

New polling logic for LAN. #252

Same approach as !104. Modified the poller to cancel any scheduled poll when pollNow() is called and randomise the next polling interval so plugins that call pollNow() at the same time don't end up polling in sync.

Depends on !104. Fixes #252.

See merge request !105
This commit is contained in:
akwizgran
2016-02-24 20:11:54 +00:00
13 changed files with 139 additions and 146 deletions

View File

@@ -1,11 +1,15 @@
package org.briarproject.plugins;
import org.briarproject.api.TransportId;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.system.Timer;
import java.security.SecureRandom;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -20,14 +24,19 @@ class PollerImpl implements Poller {
private final Executor ioExecutor;
private final ConnectionRegistry connectionRegistry;
private final SecureRandom random;
private final Timer timer;
private final Map<TransportId, PollTask> tasks;
@Inject
PollerImpl(@IoExecutor Executor ioExecutor,
ConnectionRegistry connectionRegistry, Timer timer) {
ConnectionRegistry connectionRegistry, SecureRandom random,
Timer timer) {
this.ioExecutor = ioExecutor;
this.connectionRegistry = connectionRegistry;
this.random = random;
this.timer = timer;
tasks = new ConcurrentHashMap<TransportId, PollTask>();
}
public void stop() {
@@ -35,17 +44,28 @@ class PollerImpl implements Poller {
}
public void addPlugin(Plugin p) {
schedule(p, true);
// Randomise first polling interval
schedule(p, randomise(p.getPollingInterval()), false);
}
private void schedule(Plugin plugin, boolean randomise) {
long interval = plugin.getPollingInterval();
// Randomise intervals at startup to spread out connection attempts
if (randomise) interval = (long) (interval * Math.random());
timer.schedule(new PollTask(plugin), interval);
public void pollNow(Plugin p) {
// Randomise next polling interval
schedule(p, 0, true);
}
public void pollNow(final Plugin p) {
private int randomise(int interval) {
return (int) (interval * random.nextDouble());
}
private void schedule(Plugin p, int interval, boolean randomiseNext) {
// Replace any previously scheduled task for this plugin
PollTask task = new PollTask(p, randomiseNext);
PollTask replaced = tasks.put(p.getId(), task);
if (replaced != null) replaced.cancel();
timer.schedule(task, interval);
}
private void poll(final Plugin p) {
ioExecutor.execute(new Runnable() {
public void run() {
if (LOG.isLoggable(INFO))
@@ -58,15 +78,20 @@ class PollerImpl implements Poller {
private class PollTask extends TimerTask {
private final Plugin plugin;
private final boolean randomiseNext;
private PollTask(Plugin plugin) {
private PollTask(Plugin plugin, boolean randomiseNext) {
this.plugin = plugin;
this.randomiseNext = randomiseNext;
}
@Override
public void run() {
pollNow(plugin);
schedule(plugin, false);
tasks.remove(plugin.getId());
int interval = plugin.getPollingInterval();
if (randomiseNext) interval = randomise(interval);
schedule(plugin, interval, false);
poll(plugin);
}
}
}

View File

@@ -1,6 +1,7 @@
package org.briarproject.plugins.tcp;
import org.briarproject.api.TransportId;
import org.briarproject.api.plugins.Backoff;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.properties.TransportProperties;
@@ -16,9 +17,9 @@ class LanTcpPlugin extends TcpPlugin {
static final TransportId ID = new TransportId("lan");
LanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxLatency, int maxIdleTime, int pollingInterval) {
super(ioExecutor, callback, maxLatency, maxIdleTime, pollingInterval);
LanTcpPlugin(Executor ioExecutor, Backoff backoff,
DuplexPluginCallback callback, int maxLatency, int maxIdleTime) {
super(ioExecutor, backoff, callback, maxLatency, maxIdleTime);
}
public TransportId getId() {

View File

@@ -1,22 +1,29 @@
package org.briarproject.plugins.tcp;
import java.util.concurrent.Executor;
import org.briarproject.api.TransportId;
import org.briarproject.api.plugins.Backoff;
import org.briarproject.api.plugins.BackoffFactory;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
import java.util.concurrent.Executor;
public class LanTcpPluginFactory implements DuplexPluginFactory {
private static final int MAX_LATENCY = 30 * 1000; // 30 seconds
private static final int MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final int POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private static final int MIN_POLLING_INTERVAL = 2 * 60 * 1000; // 2 minutes
private static final int MAX_POLLING_INTERVAL = 60 * 60 * 1000; // 1 hour
private static final double BACKOFF_BASE = 1.2;
private final Executor ioExecutor;
private final BackoffFactory backoffFactory;
public LanTcpPluginFactory(Executor ioExecutor) {
public LanTcpPluginFactory(Executor ioExecutor,
BackoffFactory backoffFactory) {
this.ioExecutor = ioExecutor;
this.backoffFactory = backoffFactory;
}
public TransportId getId() {
@@ -24,7 +31,9 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new LanTcpPlugin(ioExecutor, callback, MAX_LATENCY,
MAX_IDLE_TIME, POLLING_INTERVAL);
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE);
return new LanTcpPlugin(ioExecutor, backoff, callback, MAX_LATENCY,
MAX_IDLE_TIME);
}
}

View File

@@ -2,6 +2,7 @@ package org.briarproject.plugins.tcp;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.crypto.PseudoRandom;
import org.briarproject.api.plugins.Backoff;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
@@ -36,8 +37,9 @@ abstract class TcpPlugin implements DuplexPlugin {
Logger.getLogger(TcpPlugin.class.getName());
protected final Executor ioExecutor;
protected final Backoff backoff;
protected final DuplexPluginCallback callback;
protected final int maxLatency, maxIdleTime, pollingInterval, socketTimeout;
protected final int maxLatency, maxIdleTime, socketTimeout;
protected volatile boolean running = false;
protected volatile ServerSocket socket = null;
@@ -51,13 +53,13 @@ abstract class TcpPlugin implements DuplexPlugin {
/** Returns true if connections to the given address can be attempted. */
protected abstract boolean isConnectable(InetSocketAddress remote);
protected TcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxLatency, int maxIdleTime, int pollingInterval) {
protected TcpPlugin(Executor ioExecutor, Backoff backoff,
DuplexPluginCallback callback, int maxLatency, int maxIdleTime) {
this.ioExecutor = ioExecutor;
this.backoff = backoff;
this.callback = callback;
this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime;
this.pollingInterval = pollingInterval;
if (maxIdleTime > Integer.MAX_VALUE / 2)
socketTimeout = Integer.MAX_VALUE;
else socketTimeout = maxIdleTime * 2;
@@ -102,6 +104,7 @@ abstract class TcpPlugin implements DuplexPlugin {
return;
}
socket = ss;
backoff.reset();
SocketAddress local = ss.getLocalSocketAddress();
setLocalSocketAddress((InetSocketAddress) local);
if (LOG.isLoggable(INFO)) LOG.info("Listening on " + local);
@@ -147,6 +150,7 @@ abstract class TcpPlugin implements DuplexPlugin {
}
if (LOG.isLoggable(INFO))
LOG.info("Connection from " + s.getRemoteSocketAddress());
backoff.reset();
TcpTransportConnection conn = new TcpTransportConnection(this, s);
callback.incomingConnectionCreated(conn);
}
@@ -166,11 +170,12 @@ abstract class TcpPlugin implements DuplexPlugin {
}
public int getPollingInterval() {
return pollingInterval;
return backoff.getPollingInterval();
}
public void poll(Collection<ContactId> connected) {
if (!isRunning()) return;
backoff.increment();
for (ContactId c : callback.getRemoteProperties().keySet())
if (!connected.contains(c)) connectAndCallBack(c);
}
@@ -179,7 +184,10 @@ abstract class TcpPlugin implements DuplexPlugin {
ioExecutor.execute(new Runnable() {
public void run() {
DuplexTransportConnection d = createConnection(c);
if (d != null) callback.outgoingConnectionCreated(c, d);
if (d != null) {
backoff.reset();
callback.outgoingConnectionCreated(c, d);
}
}
});
}

View File

@@ -1,6 +1,7 @@
package org.briarproject.plugins.tcp;
import org.briarproject.api.TransportId;
import org.briarproject.api.plugins.Backoff;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.properties.TransportProperties;
@@ -20,10 +21,9 @@ class WanTcpPlugin extends TcpPlugin {
private volatile MappingResult mappingResult;
WanTcpPlugin(Executor ioExecutor, PortMapper portMapper,
DuplexPluginCallback callback, int maxLatency, int maxIdleTime,
int pollingInterval) {
super(ioExecutor, callback, maxLatency, maxIdleTime, pollingInterval);
WanTcpPlugin(Executor ioExecutor, Backoff backoff, PortMapper portMapper,
DuplexPluginCallback callback, int maxLatency, int maxIdleTime) {
super(ioExecutor, backoff, callback, maxLatency, maxIdleTime);
this.portMapper = portMapper;
}

View File

@@ -1,25 +1,31 @@
package org.briarproject.plugins.tcp;
import java.util.concurrent.Executor;
import org.briarproject.api.TransportId;
import org.briarproject.api.lifecycle.ShutdownManager;
import org.briarproject.api.plugins.Backoff;
import org.briarproject.api.plugins.BackoffFactory;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
import java.util.concurrent.Executor;
public class WanTcpPluginFactory implements DuplexPluginFactory {
private static final int MAX_LATENCY = 30 * 1000; // 30 seconds
private static final int MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final int POLLING_INTERVAL = 5 * 60 * 1000; // 5 minutes
private static final int MIN_POLLING_INTERVAL = 2 * 60 * 1000; // 2 minutes
private static final int MAX_POLLING_INTERVAL = 60 * 60 * 1000; // 1 hour
private static final double BACKOFF_BASE = 1.2;
private final Executor ioExecutor;
private final BackoffFactory backoffFactory;
private final ShutdownManager shutdownManager;
public WanTcpPluginFactory(Executor ioExecutor,
ShutdownManager shutdownManager) {
BackoffFactory backoffFactory, ShutdownManager shutdownManager) {
this.ioExecutor = ioExecutor;
this.backoffFactory = backoffFactory;
this.shutdownManager = shutdownManager;
}
@@ -28,7 +34,10 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new WanTcpPlugin(ioExecutor, new PortMapperImpl(shutdownManager),
callback, MAX_LATENCY, MAX_IDLE_TIME, POLLING_INTERVAL);
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE);
return new WanTcpPlugin(ioExecutor, backoff,
new PortMapperImpl(shutdownManager), callback, MAX_LATENCY,
MAX_IDLE_TIME);
}
}