Pass executor to scheduler.

This commit is contained in:
akwizgran
2020-08-06 13:11:12 +01:00
parent d5395d3d01
commit 3aa00ecb3d
14 changed files with 128 additions and 92 deletions

View File

@@ -55,7 +55,8 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
synchronized (lock) {
if (streams.isEmpty()) {
task = scheduler.scheduleWithFixedDelay(this::checkTimeouts,
CHECK_INTERVAL_MS, CHECK_INTERVAL_MS, MILLISECONDS);
ioExecutor, CHECK_INTERVAL_MS, CHECK_INTERVAL_MS,
MILLISECONDS);
}
streams.add(stream);
}
@@ -73,23 +74,21 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
if (toCancel != null) toCancel.cancel(false);
}
// Scheduler
@IoExecutor
private void checkTimeouts() {
ioExecutor.execute(() -> {
List<TimeoutInputStream> snapshot;
synchronized (lock) {
snapshot = new ArrayList<>(streams);
}
for (TimeoutInputStream stream : snapshot) {
if (stream.hasTimedOut()) {
LOG.info("Input stream has timed out");
try {
stream.close();
} catch (IOException e) {
logException(LOG, INFO, e);
}
List<TimeoutInputStream> snapshot;
synchronized (lock) {
snapshot = new ArrayList<>(streams);
}
for (TimeoutInputStream stream : snapshot) {
if (stream.hasTimedOut()) {
LOG.info("Input stream has timed out");
try {
stream.close();
} catch (IOException e) {
logException(LOG, INFO, e);
}
}
});
}
}
}

View File

@@ -118,6 +118,7 @@ class PollerImpl implements Poller, EventListener {
}
}
// TODO: Make this wakeful
private void connectToContact(ContactId c) {
for (SimplexPlugin s : pluginManager.getSimplexPlugins())
if (s.shouldPoll()) connectToContact(c, s);
@@ -189,8 +190,8 @@ class PollerImpl implements Poller, EventListener {
// it will abort safely when it finds it's been replaced
if (scheduled != null) scheduled.future.cancel(false);
PollTask task = new PollTask(p, due, randomiseNext);
Future<?> future = scheduler.schedule(() ->
ioExecutor.execute(task), delay, MILLISECONDS);
Future<?> future = scheduler.schedule(task, ioExecutor, delay,
MILLISECONDS);
tasks.put(t, new ScheduledPollTask(task, future));
}
} finally {
@@ -233,9 +234,9 @@ class PollerImpl implements Poller, EventListener {
private class ScheduledPollTask {
private final PollTask task;
private final Future future;
private final Future<?> future;
private ScheduledPollTask(PollTask task, Future future) {
private ScheduledPollTask(PollTask task, Future<?> future) {
this.task = task;
this.future = future;
}

View File

@@ -143,8 +143,8 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
} catch (DbException e) {
throw new ServiceException(e);
}
scheduler.scheduleWithFixedDelay(this::poll, POLLING_INTERVAL_MS,
POLLING_INTERVAL_MS, MILLISECONDS);
scheduler.scheduleWithFixedDelay(this::poll, worker,
POLLING_INTERVAL_MS, POLLING_INTERVAL_MS, MILLISECONDS);
}
@EventExecutor
@@ -204,12 +204,10 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
return plugin.createRendezvousEndpoint(k, cs.alice, h);
}
// Scheduler
// Worker
private void poll() {
worker.execute(() -> {
removeExpiredPendingContacts();
for (PluginState ps : pluginStates.values()) poll(ps);
});
removeExpiredPendingContacts();
for (PluginState ps : pluginStates.values()) poll(ps);
}
// Worker

View File

@@ -3,6 +3,7 @@ package org.briarproject.bramble.system;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.system.TaskScheduler;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -10,8 +11,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
/**
* A {@link TaskScheduler} that delegates all calls to a
* {@link ScheduledExecutorService}.
* A {@link TaskScheduler} that uses a {@link ScheduledExecutorService}.
*/
@ThreadSafe
@NotNullByDefault
@@ -24,13 +24,16 @@ class TaskSchedulerImpl implements TaskScheduler {
}
@Override
public Future<?> schedule(Runnable task, long delay, TimeUnit unit) {
return delegate.schedule(task, delay, unit);
public Future<?> schedule(Runnable task, Executor executor, long delay,
TimeUnit unit) {
Runnable execute = () -> executor.execute(task);
return delegate.schedule(execute, delay, unit);
}
@Override
public Future<?> scheduleWithFixedDelay(Runnable task, long delay,
long interval, TimeUnit unit) {
return delegate.scheduleWithFixedDelay(task, delay, interval, unit);
public Future<?> scheduleWithFixedDelay(Runnable task, Executor executor,
long delay, long interval, TimeUnit unit) {
Runnable execute = () -> executor.execute(task);
return delegate.scheduleWithFixedDelay(execute, delay, interval, unit);
}
}

View File

@@ -6,6 +6,7 @@ import org.briarproject.bramble.api.contact.PendingContactId;
import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.crypto.TransportCrypto;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
@@ -198,17 +199,16 @@ class TransportKeyManagerImpl implements TransportKeyManager {
private void scheduleKeyUpdate(long now) {
long delay = timePeriodLength - now % timePeriodLength;
scheduler.schedule(this::updateKeys, delay, MILLISECONDS);
scheduler.schedule(this::updateKeys, dbExecutor, delay, MILLISECONDS);
}
@DatabaseExecutor
private void updateKeys() {
dbExecutor.execute(() -> {
try {
db.transaction(false, this::updateKeys);
} catch (DbException e) {
logException(LOG, WARNING, e);
}
});
try {
db.transaction(false, this::updateKeys);
} catch (DbException e) {
logException(LOG, WARNING, e);
}
}
@Override

View File

@@ -234,7 +234,8 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval), with(MILLISECONDS));
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
}});
@@ -262,7 +263,8 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval), with(MILLISECONDS));
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
// Second event
// Get the plugin
@@ -304,7 +306,8 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval), with(MILLISECONDS));
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
// Second event
// Get the plugin
@@ -320,7 +323,8 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(now + 1));
oneOf(future).cancel(false);
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval - 2), with(MILLISECONDS));
with(ioExecutor), with((long) pollingInterval - 2),
with(MILLISECONDS));
}});
poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
@@ -345,8 +349,8 @@ public class PollerImplTest extends BrambleMockTestCase {
// Schedule a polling task immediately
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), with(0L),
with(MILLISECONDS));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future));
will(new RunAction());
// Running the polling task schedules the next polling task
@@ -357,7 +361,8 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) (pollingInterval * 0.5)), with(MILLISECONDS));
with(ioExecutor), with((long) (pollingInterval * 0.5)),
with(MILLISECONDS));
will(returnValue(future));
// Get the transport properties and connected contacts
oneOf(transportPropertyManager).getRemoteProperties(transportId);
@@ -388,8 +393,8 @@ public class PollerImplTest extends BrambleMockTestCase {
// Schedule a polling task immediately
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), with(0L),
with(MILLISECONDS));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future));
will(new RunAction());
// Running the polling task schedules the next polling task
@@ -400,7 +405,8 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) (pollingInterval * 0.5)), with(MILLISECONDS));
with(ioExecutor), with((long) (pollingInterval * 0.5)),
with(MILLISECONDS));
will(returnValue(future));
// Get the transport properties and connected contacts
oneOf(transportPropertyManager).getRemoteProperties(transportId);
@@ -429,8 +435,8 @@ public class PollerImplTest extends BrambleMockTestCase {
// Schedule a polling task immediately
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), with(0L),
with(MILLISECONDS));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future));
// The plugin is deactivated before the task runs - cancel the task
oneOf(future).cancel(false);
@@ -454,7 +460,8 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval), with(MILLISECONDS));
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
}});
}

View File

@@ -123,8 +123,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
e.getPendingContactState() == OFFLINE)));
// Capture the poll task
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS));
with(any(Executor.class)), with(POLLING_INTERVAL_MS),
with(POLLING_INTERVAL_MS), with(MILLISECONDS));
will(new CaptureArgumentAction<>(capturePollTask, Runnable.class,
0));
}});
@@ -159,8 +159,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
e.getPendingContactState() == FAILED)));
// Schedule the poll task
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS));
with(any(Executor.class)), with(POLLING_INTERVAL_MS),
with(POLLING_INTERVAL_MS), with(MILLISECONDS));
}});
rendezvousPoller.startService();
@@ -468,8 +468,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
will(returnValue(emptyList()));
// Capture the poll task
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS));
with(any(Executor.class)), with(POLLING_INTERVAL_MS),
with(POLLING_INTERVAL_MS), with(MILLISECONDS));
will(new CaptureArgumentAction<>(capturePollTask, Runnable.class,
0));
}});
@@ -545,8 +545,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
e.getPendingContactState() == OFFLINE)));
// Capture the poll task
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS));
with(any(Executor.class)), with(POLLING_INTERVAL_MS),
with(POLLING_INTERVAL_MS), with(MILLISECONDS));
will(new CaptureArgumentAction<>(capturePollTask, Runnable.class,
0));
}});

View File

@@ -117,7 +117,8 @@ public class TransportKeyManagerImplTest extends BrambleMockTestCase {
new TransportKeySet(keySetId, contactId, null, updated)));
// Schedule a key update at the start of the next time period
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(timePeriodLength - 1), with(MILLISECONDS));
with(dbExecutor), with(timePeriodLength - 1),
with(MILLISECONDS));
}});
transportKeyManager.start(txn);
@@ -420,7 +421,8 @@ public class TransportKeyManagerImplTest extends BrambleMockTestCase {
}
// Schedule a key update at the start of the next time period
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(timePeriodLength), with(MILLISECONDS));
with(dbExecutor), with(timePeriodLength),
with(MILLISECONDS));
will(new RunAction());
oneOf(dbExecutor).execute(with(any(Runnable.class)));
will(new RunAction());
@@ -445,7 +447,8 @@ public class TransportKeyManagerImplTest extends BrambleMockTestCase {
new TransportKeySet(keySetId, contactId, null, updated)));
// Schedule a key update at the start of the next time period
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(timePeriodLength), with(MILLISECONDS));
with(dbExecutor), with(timePeriodLength),
with(MILLISECONDS));
}});
transportKeyManager.start(txn);