Replaced Timer with ScheduledExecutorService. #258

This commit is contained in:
akwizgran
2016-05-05 11:07:58 +01:00
parent 036c5d6753
commit ff8301521c
26 changed files with 201 additions and 212 deletions

View File

@@ -10,6 +10,7 @@ import org.briarproject.messaging.MessagingModule;
import org.briarproject.plugins.PluginsModule;
import org.briarproject.properties.PropertiesModule;
import org.briarproject.sync.SyncModule;
import org.briarproject.system.SystemModule;
import org.briarproject.transport.TransportModule;
public interface CoreEagerSingletons {
@@ -34,5 +35,7 @@ public interface CoreEagerSingletons {
void inject(SyncModule.EagerSingletons init);
void inject(SystemModule.EagerSingletons init);
void inject(TransportModule.EagerSingletons init);
}

View File

@@ -61,6 +61,7 @@ public class CoreModule {
c.inject(new PluginsModule.EagerSingletons());
c.inject(new PropertiesModule.EagerSingletons());
c.inject(new SyncModule.EagerSingletons());
c.inject(new SystemModule.EagerSingletons());
c.inject(new TransportModule.EagerSingletons());
c.inject(new IntroductionModule.EagerSingletons());
}

View File

@@ -122,9 +122,6 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
public void stopService() throws ServiceException {
// Stop listening for events
eventBus.removeListener(this);
// Stop the poller
LOG.info("Stopping poller");
poller.stop();
final CountDownLatch latch = new CountDownLatch(plugins.size());
// Stop the simplex plugins
LOG.info("Stopping simplex plugins");
@@ -432,7 +429,7 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
public void transportEnabled() {
eventBus.broadcast(new TransportEnabledEvent(id));
Plugin p = plugins.get(id);
if (p != null) poller.pollNow(p);
if (p != null && p.shouldPoll()) poller.pollNow(p);
}
@Override

View File

@@ -8,12 +8,10 @@ import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.sync.SyncSessionFactory;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.KeyManager;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.api.transport.StreamWriterFactory;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
import javax.inject.Inject;
@@ -36,10 +34,9 @@ public class PluginsModule {
}
@Provides
Poller providePoller(@IoExecutor Executor ioExecutor,
ConnectionRegistry connectionRegistry, SecureRandom random,
Timer timer) {
return new PollerImpl(ioExecutor, connectionRegistry, random, timer);
@Singleton
Poller providePoller(PollerImpl poller) {
return poller;
}
@Provides

View File

@@ -6,7 +6,4 @@ interface Poller {
/** Tells the poller to poll the given plugin immediately. */
void pollNow(Plugin p);
/** Stops the poller. */
void stop();
}

View File

@@ -4,17 +4,17 @@ import org.briarproject.api.TransportId;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.system.Timer;
import java.security.SecureRandom;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import javax.inject.Inject;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
class PollerImpl implements Poller {
@@ -23,31 +23,26 @@ class PollerImpl implements Poller {
Logger.getLogger(PollerImpl.class.getName());
private final Executor ioExecutor;
private final ScheduledExecutorService scheduler;
private final ConnectionRegistry connectionRegistry;
private final SecureRandom random;
private final Timer timer;
private final Map<TransportId, PollTask> tasks;
@Inject
PollerImpl(@IoExecutor Executor ioExecutor,
ConnectionRegistry connectionRegistry, SecureRandom random,
Timer timer) {
ScheduledExecutorService scheduler,
ConnectionRegistry connectionRegistry, SecureRandom random) {
this.ioExecutor = ioExecutor;
this.connectionRegistry = connectionRegistry;
this.random = random;
this.timer = timer;
this.scheduler = scheduler;
tasks = new ConcurrentHashMap<TransportId, PollTask>();
}
@Override
public void stop() {
timer.cancel();
}
@Override
public void pollNow(Plugin p) {
// Randomise next polling interval
if (p.shouldPoll()) schedule(p, 0, true);
schedule(p, 0, true);
}
private void schedule(Plugin p, int interval, boolean randomiseNext) {
@@ -55,7 +50,7 @@ class PollerImpl implements Poller {
PollTask task = new PollTask(p, randomiseNext);
PollTask replaced = tasks.put(p.getId(), task);
if (replaced != null) replaced.cancel();
timer.schedule(task, interval);
scheduler.schedule(task, interval, MILLISECONDS);
}
private void poll(final Plugin p) {
@@ -69,18 +64,25 @@ class PollerImpl implements Poller {
});
}
private class PollTask extends TimerTask {
private class PollTask implements Runnable {
private final Plugin plugin;
private final boolean randomiseNext;
private volatile boolean cancelled = false;
private PollTask(Plugin plugin, boolean randomiseNext) {
this.plugin = plugin;
this.randomiseNext = randomiseNext;
}
private void cancel() {
cancelled = true;
}
@Override
public void run() {
if (cancelled) return;
tasks.remove(plugin.getId());
int interval = plugin.getPollingInterval();
if (randomiseNext)

View File

@@ -1,23 +1,41 @@
package org.briarproject.system;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.system.Clock;
import org.briarproject.api.system.LocationUtils;
import org.briarproject.api.system.SeedProvider;
import org.briarproject.api.system.Timer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import javax.inject.Singleton;
import dagger.Module;
import dagger.Provides;
@Module
public class SystemModule {
public static class EagerSingletons {
@Inject
ScheduledExecutorService scheduledExecutorService;
}
private final ScheduledExecutorService scheduler;
public SystemModule() {
scheduler = Executors.newSingleThreadScheduledExecutor();
}
@Provides
Clock provideClock() {
return new SystemClock();
}
@Provides
Timer provideTimer() {
return new SystemTimer();
@Singleton
ScheduledExecutorService provideScheduledExecutorService(
LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(scheduler);
return scheduler;
}
}

View File

@@ -1,31 +0,0 @@
package org.briarproject.system;
import java.util.TimerTask;
import org.briarproject.api.system.Timer;
/** Default timer implementation. */
public class SystemTimer implements Timer {
private final java.util.Timer timer = new java.util.Timer(true);
public void cancel() {
timer.cancel();
}
public int purge() {
return timer.purge();
}
public void schedule(TimerTask task, long delay) {
timer.schedule(task, delay);
}
public void schedule(TimerTask task, long delay, long period) {
timer.schedule(task, delay, period);
}
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
timer.scheduleAtFixedRate(task, delay, period);
}
}

View File

@@ -19,7 +19,6 @@ import org.briarproject.api.plugins.PluginConfig;
import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
import org.briarproject.api.plugins.simplex.SimplexPluginFactory;
import org.briarproject.api.system.Clock;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.KeyManager;
import org.briarproject.api.transport.StreamContext;
@@ -28,6 +27,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -42,21 +42,22 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
private final DatabaseComponent db;
private final CryptoComponent crypto;
private final Executor dbExecutor;
private final ScheduledExecutorService scheduler;
private final PluginConfig pluginConfig;
private final Timer timer;
private final Clock clock;
private final Map<ContactId, Boolean> activeContacts;
private final ConcurrentHashMap<TransportId, TransportKeyManager> managers;
@Inject
KeyManagerImpl(DatabaseComponent db, CryptoComponent crypto,
@DatabaseExecutor Executor dbExecutor, PluginConfig pluginConfig,
Timer timer, Clock clock) {
@DatabaseExecutor Executor dbExecutor,
ScheduledExecutorService scheduler, PluginConfig pluginConfig,
Clock clock) {
this.db = db;
this.crypto = crypto;
this.dbExecutor = dbExecutor;
this.scheduler = scheduler;
this.pluginConfig = pluginConfig;
this.timer = timer;
this.clock = clock;
// Use a ConcurrentHashMap as a thread-safe set
activeContacts = new ConcurrentHashMap<ContactId, Boolean>();
@@ -80,7 +81,8 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
db.addTransport(txn, e.getKey(), e.getValue());
for (Entry<TransportId, Integer> e : transports.entrySet()) {
TransportKeyManager m = new TransportKeyManager(db, crypto,
timer, clock, e.getKey(), e.getValue());
dbExecutor, scheduler, clock, e.getKey(),
e.getValue());
managers.put(e.getKey(), m);
m.start(txn);
}
@@ -97,12 +99,14 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
public void stopService() {
}
@Override
public void addContact(Transaction txn, ContactId c, SecretKey master,
long timestamp, boolean alice) throws DbException {
for (TransportKeyManager m : managers.values())
m.addContact(txn, c, master, timestamp, alice);
}
@Override
public StreamContext getStreamContext(ContactId c, TransportId t)
throws DbException {
// Don't allow outgoing streams to inactive contacts
@@ -123,6 +127,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
return ctx;
}
@Override
public StreamContext getStreamContext(TransportId t, byte[] tag)
throws DbException {
TransportKeyManager m = managers.get(t);
@@ -141,6 +146,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
return ctx;
}
@Override
public void eventOccurred(Event e) {
if (e instanceof ContactRemovedEvent) {
removeContact(((ContactRemovedEvent) e).getContactId());
@@ -154,6 +160,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
private void removeContact(final ContactId c) {
activeContacts.remove(c);
dbExecutor.execute(new Runnable() {
@Override
public void run() {
for (TransportKeyManager m : managers.values())
m.removeContact(c);

View File

@@ -9,7 +9,6 @@ import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.system.Clock;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.TransportKeys;
import org.briarproject.transport.ReorderingWindow.Change;
@@ -18,10 +17,12 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.transport.TransportConstants.MAX_CLOCK_DIFFERENCE;
import static org.briarproject.api.transport.TransportConstants.TAG_LENGTH;
@@ -34,7 +35,8 @@ class TransportKeyManager {
private final DatabaseComponent db;
private final CryptoComponent crypto;
private final Timer timer;
private final Executor dbExecutor;
private final ScheduledExecutorService scheduler;
private final Clock clock;
private final TransportId transportId;
private final long rotationPeriodLength;
@@ -46,11 +48,12 @@ class TransportKeyManager {
private final Map<ContactId, MutableTransportKeys> keys;
TransportKeyManager(DatabaseComponent db, CryptoComponent crypto,
Timer timer, Clock clock, TransportId transportId,
long maxLatency) {
Executor dbExecutor, ScheduledExecutorService scheduler,
Clock clock, TransportId transportId, long maxLatency) {
this.db = db;
this.crypto = crypto;
this.timer = timer;
this.dbExecutor = dbExecutor;
this.scheduler = scheduler;
this.clock = clock;
this.transportId = transportId;
rotationPeriodLength = maxLatency + MAX_CLOCK_DIFFERENCE;
@@ -122,7 +125,19 @@ class TransportKeyManager {
}
private void scheduleKeyRotation(long now) {
TimerTask task = new TimerTask() {
Runnable task = new Runnable() {
@Override
public void run() {
rotateKeys();
}
};
long delay = rotationPeriodLength - now % rotationPeriodLength;
scheduler.schedule(task, delay, MILLISECONDS);
}
private void rotateKeys() {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Transaction txn = db.startTransaction(false);
@@ -137,9 +152,7 @@ class TransportKeyManager {
LOG.log(WARNING, e.toString(), e);
}
}
};
long delay = rotationPeriodLength - now % rotationPeriodLength;
timer.schedule(task, delay);
});
}
void addContact(Transaction txn, ContactId c, SecretKey master,

View File

@@ -18,7 +18,8 @@ import dagger.Provides;
public class TransportModule {
public static class EagerSingletons {
@Inject KeyManager keyManager;
@Inject
KeyManager keyManager;
}
@Provides
@@ -35,7 +36,7 @@ public class TransportModule {
@Provides
@Singleton
KeyManager getKeyManager(LifecycleManager lifecycleManager,
KeyManager provideKeyManager(LifecycleManager lifecycleManager,
EventBus eventBus, KeyManagerImpl keyManager) {
lifecycleManager.registerService(keyManager);
eventBus.addListener(keyManager);