Poll plugins when connectivity changes. Bug #66.

This should enable us to connect to contacts faster at startup and
whenever a new means of connecting becomes available.
This commit is contained in:
akwizgran
2014-04-04 22:04:05 +01:00
parent 9f8d12e6c2
commit 08b91d2483
11 changed files with 113 additions and 136 deletions

View File

@@ -172,6 +172,7 @@ class DroidtoothPlugin implements DuplexPlugin {
}
LOG.info("Socket bound");
socket = ss;
callback.pollNow();
acceptContactConnections();
}
});

View File

@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.zip.ZipInputStream;
@@ -80,6 +81,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
private final long maxLatency, pollingInterval;
private final File torDirectory, torFile, geoIpFile, configFile, doneFile;
private final File cookieFile, pidFile, hostnameFile;
private final AtomicBoolean firstCircuit;
private volatile boolean running = false, networkEnabled = false;
private volatile Process tor = null;
@@ -109,6 +111,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
cookieFile = new File(torDirectory, ".tor/control_auth_cookie");
pidFile = new File(torDirectory, ".tor/pid");
hostnameFile = new File(torDirectory, "hostname");
firstCircuit = new AtomicBoolean(true);
}
public TransportId getId() {
@@ -216,11 +219,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
IntentFilter filter = new IntentFilter(CONNECTIVITY_ACTION);
appContext.registerReceiver(networkStateReceiver, filter);
// Bind a server socket to receive incoming hidden service connections
pluginExecutor.execute(new Runnable() {
public void run() {
bind();
}
});
bind();
return true;
}
@@ -428,38 +427,43 @@ class TorPlugin implements DuplexPlugin, EventHandler {
}
private void bind() {
// If there's already a port number stored in config, reuse it
String portString = callback.getConfig().get("port");
int port;
if(StringUtils.isNullOrEmpty(portString)) port = 0;
else port = Integer.parseInt(portString);
// Bind a server socket to receive connections from the Tor process
ServerSocket ss = null;
try {
ss = new ServerSocket();
ss.bind(new InetSocketAddress("127.0.0.1", port));
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
tryToClose(ss);
}
if(!running) {
tryToClose(ss);
return;
}
socket = ss;
// Store the port number
final String localPort = String.valueOf(ss.getLocalPort());
TransportConfig c = new TransportConfig();
c.put("port", localPort);
callback.mergeConfig(c);
// Create a hidden service if necessary
pluginExecutor.execute(new Runnable() {
public void run() {
publishHiddenService(localPort);
// If there's already a port number stored in config, reuse it
String portString = callback.getConfig().get("port");
int port;
if(StringUtils.isNullOrEmpty(portString)) port = 0;
else port = Integer.parseInt(portString);
// Bind a server socket to receive connections from Tor
ServerSocket ss = null;
try {
ss = new ServerSocket();
ss.bind(new InetSocketAddress("127.0.0.1", port));
} catch(IOException e) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
tryToClose(ss);
}
if(!running) {
tryToClose(ss);
return;
}
socket = ss;
// Store the port number
final String localPort = String.valueOf(ss.getLocalPort());
TransportConfig c = new TransportConfig();
c.put("port", localPort);
callback.mergeConfig(c);
// Create a hidden service if necessary
pluginExecutor.execute(new Runnable() {
public void run() {
publishHiddenService(localPort);
}
});
// Accept incoming hidden service connections from Tor
acceptContactConnections(ss);
}
});
// Accept incoming hidden service connections from the Tor process
acceptContactConnections(ss);
}
private void tryToClose(ServerSocket ss) {
@@ -534,6 +538,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
private void enableNetwork(boolean enable) throws IOException {
if(!running) return;
if(LOG.isLoggable(INFO)) LOG.info("Enabling network: " + enable);
if(!enable) firstCircuit.set(true);
controlConnection.setConf("DisableNetwork", enable ? "0" : "1");
networkEnabled = enable;
}
@@ -629,6 +634,10 @@ class TorPlugin implements DuplexPlugin, EventHandler {
if(!"EXTENDED".equals(status))
LOG.info("Circuit " + id + " " + status);
}
if("BUILT".equals(status) && firstCircuit.getAndSet(false)) {
LOG.info("First circuit built");
callback.pollNow();
}
}
public void streamStatus(String status, String id, String target) {

View File

@@ -49,4 +49,7 @@ public interface PluginCallback {
* format string and arguments.
*/
void showMessage(String... message);
/** Schedules the plugin to be polled immediately. */
void pollNow();
}

View File

@@ -363,6 +363,11 @@ class PluginManagerImpl implements PluginManager {
public void showMessage(String... message) {
uiCallback.showMessage(message);
}
public void pollNow() {
Plugin p = plugins.get(id);
if(p != null) poller.pollNow(p);
}
}
private class SimplexCallback extends PluginCallbackImpl

View File

@@ -6,9 +6,12 @@ import org.briarproject.api.plugins.Plugin;
interface Poller {
/** Starts a new thread to poll the given collection of plugins. */
/** Starts a poller for the given collection of plugins. */
void start(Collection<Plugin> plugins);
/** Tells the poller thread to exit. */
/** Stops the poller. */
void stop();
/** Tells the poller to poll the given plugin immediately. */
void pollNow(Plugin p);
}

View File

@@ -3,125 +3,72 @@ package org.briarproject.plugins;
import static java.util.logging.Level.INFO;
import java.util.Collection;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.system.Clock;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.ConnectionRegistry;
class PollerImpl implements Poller, Runnable {
class PollerImpl implements Poller {
private static final Logger LOG =
Logger.getLogger(PollerImpl.class.getName());
private final Executor pluginExecutor;
private final ConnectionRegistry connRegistry;
private final Clock clock;
private final SortedSet<PollTime> pollTimes;
private final Timer timer;
@Inject
PollerImpl(@PluginExecutor Executor pluginExecutor,
ConnectionRegistry connRegistry, Clock clock) {
ConnectionRegistry connRegistry, Timer timer) {
this.pluginExecutor = pluginExecutor;
this.connRegistry = connRegistry;
this.clock = clock;
pollTimes = new TreeSet<PollTime>();
this.timer = timer;
}
public synchronized void start(Collection<Plugin> plugins) {
public void start(Collection<Plugin> plugins) {
for(Plugin plugin : plugins) schedule(plugin, true);
new Thread(this, "Poller").start();
}
private synchronized void schedule(Plugin plugin, boolean randomise) {
private void schedule(Plugin plugin, boolean randomise) {
if(plugin.shouldPoll()) {
long now = clock.currentTimeMillis();
long interval = plugin.getPollingInterval();
// Randomise intervals at startup to spread out connection attempts
if(randomise) interval = (long) (interval * Math.random());
pollTimes.add(new PollTime(now + interval, plugin));
timer.schedule(new PollTask(plugin), interval);
}
}
public synchronized void stop() {
pollTimes.clear();
notifyAll();
public void stop() {
timer.cancel();
}
public void run() {
while(true) {
synchronized(this) {
if(pollTimes.isEmpty()) {
LOG.info("Finished polling");
return;
}
long now = clock.currentTimeMillis();
final PollTime p = pollTimes.first();
if(now >= p.time) {
boolean removed = pollTimes.remove(p);
assert removed;
final Collection<ContactId> connected =
connRegistry.getConnectedContacts(p.plugin.getId());
if(LOG.isLoggable(INFO)) {
String name = p.plugin.getClass().getSimpleName();
LOG.info("Polling " + name);
}
pluginExecutor.execute(new Runnable() {
public void run() {
p.plugin.poll(connected);
}
});
schedule(p.plugin, false);
} else {
try {
wait(p.time - now);
} catch(InterruptedException e) {
LOG.warning("Interrupted while waiting to poll");
Thread.currentThread().interrupt();
return;
}
}
public void pollNow(final Plugin p) {
pluginExecutor.execute(new Runnable() {
public void run() {
if(LOG.isLoggable(INFO))
LOG.info("Polling " + p.getClass().getSimpleName());
p.poll(connRegistry.getConnectedContacts(p.getId()));
}
}
});
}
private static class PollTime implements Comparable<PollTime> {
private class PollTask extends TimerTask {
private final long time;
private final Plugin plugin;
private PollTime(long time, Plugin plugin) {
this.time = time;
private PollTask(Plugin plugin) {
this.plugin = plugin;
}
// Must be consistent with equals()
public int compareTo(PollTime p) {
if(time < p.time) return -1;
if(time > p.time) return 1;
return 0;
}
// Must be consistent with equals()
@Override
public int hashCode() {
return (int) (time ^ (time >>> 32)) ^ plugin.hashCode();
}
@Override
public boolean equals(Object o) {
if(o instanceof PollTime) {
PollTime p = (PollTime) o;
return time == p.time && plugin == p.plugin;
}
return false;
public void run() {
pollNow(plugin);
schedule(plugin, false);
}
}
}

View File

@@ -98,6 +98,7 @@ abstract class TcpPlugin implements DuplexPlugin {
SocketAddress local = ss.getLocalSocketAddress();
setLocalSocketAddress((InetSocketAddress) local);
if(LOG.isLoggable(INFO)) LOG.info("Listening on " + local);
callback.pollNow();
acceptContactConnections();
}
});

View File

@@ -91,35 +91,37 @@ class BluetoothPlugin implements DuplexPlugin {
if(LOG.isLoggable(INFO))
LOG.info("Local address " + localDevice.getBluetoothAddress());
running = true;
pluginExecutor.execute(new Runnable() {
public void run() {
bind();
}
});
bind();
return true;
}
private void bind() {
if(!running) return;
// Advertise the Bluetooth address to contacts
TransportProperties p = new TransportProperties();
p.put("address", localDevice.getBluetoothAddress());
callback.mergeLocalProperties(p);
// Bind a server socket to accept connections from contacts
String url = makeUrl("localhost", getUuid());
StreamConnectionNotifier ss;
try {
ss = (StreamConnectionNotifier) Connector.open(url);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return;
}
if(!running) {
tryToClose(ss);
return;
}
socket = ss;
acceptContactConnections(ss);
pluginExecutor.execute(new Runnable() {
public void run() {
if(!running) return;
// Advertise the Bluetooth address to contacts
TransportProperties p = new TransportProperties();
p.put("address", localDevice.getBluetoothAddress());
callback.mergeLocalProperties(p);
// Bind a server socket to accept connections from contacts
String url = makeUrl("localhost", getUuid());
StreamConnectionNotifier ss;
try {
ss = (StreamConnectionNotifier) Connector.open(url);
} catch(IOException e) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
return;
}
if(!running) {
tryToClose(ss);
return;
}
socket = ss;
callback.pollNow();
acceptContactConnections(ss);
}
});
}
private String makeUrl(String address, String uuid) {

View File

@@ -100,6 +100,8 @@ public abstract class DuplexClientTest extends DuplexTest {
public void showMessage(String... message) {}
public void pollNow() {}
public void incomingConnectionCreated(DuplexTransportConnection d) {}
public void outgoingConnectionCreated(ContactId contactId,

View File

@@ -99,6 +99,8 @@ public abstract class DuplexServerTest extends DuplexTest {
public void showMessage(String... message) {}
public void pollNow() {}
public void incomingConnectionCreated(DuplexTransportConnection d) {
System.out.println("Connection received");
sendChallengeReceiveResponse(d);

View File

@@ -133,6 +133,8 @@ public class LanTcpPluginTest extends BriarTestCase {
public void showMessage(String... message) {}
public void pollNow() {}
public void incomingConnectionCreated(DuplexTransportConnection d) {
connectionsLatch.countDown();
}