Merge branch '1759-fix-periodic-task-cancellation' into 'master'

Fix cancellation of periodic tasks, remove ticker

Closes #1759

See merge request briar/briar!1274
This commit is contained in:
Torsten Grote
2020-08-14 12:47:20 +00:00
7 changed files with 129 additions and 75 deletions

View File

@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.io.TimeoutMonitor;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.TaskScheduler.Cancellable;
import org.briarproject.bramble.api.system.Wakeful;
import java.io.IOException;
@@ -11,7 +12,6 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
@@ -38,7 +38,7 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
private final List<TimeoutInputStream> streams = new ArrayList<>();
@GuardedBy("lock")
private Future<?> task = null;
private Cancellable cancellable = null;
@Inject
TimeoutMonitorImpl(TaskScheduler scheduler,
@@ -55,9 +55,9 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
timeoutMs, this::removeStream);
synchronized (lock) {
if (streams.isEmpty()) {
task = scheduler.scheduleWithFixedDelay(this::checkTimeouts,
ioExecutor, CHECK_INTERVAL_MS, CHECK_INTERVAL_MS,
MILLISECONDS);
cancellable = scheduler.scheduleWithFixedDelay(
this::checkTimeouts, ioExecutor, CHECK_INTERVAL_MS,
CHECK_INTERVAL_MS, MILLISECONDS);
}
streams.add(stream);
}
@@ -65,14 +65,17 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
}
private void removeStream(TimeoutInputStream stream) {
Future<?> toCancel = null;
Cancellable toCancel = null;
synchronized (lock) {
if (streams.remove(stream) && streams.isEmpty()) {
toCancel = task;
task = null;
toCancel = cancellable;
cancellable = null;
}
}
if (toCancel != null) toCancel.cancel(false);
if (toCancel != null) {
LOG.info("Cancelling timeout monitor task");
toCancel.cancel();
}
}
@IoExecutor

View File

@@ -27,6 +27,7 @@ import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.TaskScheduler.Cancellable;
import org.briarproject.bramble.api.system.Wakeful;
import org.briarproject.bramble.api.system.WakefulIoExecutor;
@@ -37,7 +38,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
@@ -191,11 +191,11 @@ class PollerImpl implements Poller, EventListener {
if (scheduled == null || due < scheduled.task.due) {
// If a later task exists, cancel it. If it's already started
// it will abort safely when it finds it's been replaced
if (scheduled != null) scheduled.future.cancel(false);
if (scheduled != null) scheduled.cancellable.cancel();
PollTask task = new PollTask(p, due, randomiseNext);
Future<?> future = scheduler.schedule(task, ioExecutor, delay,
MILLISECONDS);
tasks.put(t, new ScheduledPollTask(task, future));
Cancellable cancellable = scheduler.schedule(task, ioExecutor,
delay, MILLISECONDS);
tasks.put(t, new ScheduledPollTask(task, cancellable));
}
} finally {
lock.unlock();
@@ -206,7 +206,7 @@ class PollerImpl implements Poller, EventListener {
lock.lock();
try {
ScheduledPollTask scheduled = tasks.remove(t);
if (scheduled != null) scheduled.future.cancel(false);
if (scheduled != null) scheduled.cancellable.cancel();
} finally {
lock.unlock();
}
@@ -237,11 +237,11 @@ class PollerImpl implements Poller, EventListener {
private class ScheduledPollTask {
private final PollTask task;
private final Future<?> future;
private final Cancellable cancellable;
private ScheduledPollTask(PollTask task, Future<?> future) {
private ScheduledPollTask(PollTask task, Cancellable cancellable) {
this.task = task;
this.future = future;
this.cancellable = cancellable;
}
}

View File

@@ -4,8 +4,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.system.TaskScheduler;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
@@ -24,17 +24,20 @@ class TaskSchedulerImpl implements TaskScheduler {
}
@Override
public Future<?> schedule(Runnable task, Executor executor, long delay,
public Cancellable schedule(Runnable task, Executor executor, long delay,
TimeUnit unit) {
Runnable execute = () -> executor.execute(task);
return scheduledExecutorService.schedule(execute, delay, unit);
ScheduledFuture<?> future =
scheduledExecutorService.schedule(execute, delay, unit);
return () -> future.cancel(false);
}
@Override
public Future<?> scheduleWithFixedDelay(Runnable task, Executor executor,
public Cancellable scheduleWithFixedDelay(Runnable task, Executor executor,
long delay, long interval, TimeUnit unit) {
Runnable execute = () -> executor.execute(task);
return scheduledExecutorService.scheduleWithFixedDelay(execute, delay,
interval, unit);
ScheduledFuture<?> future = scheduledExecutorService.
scheduleWithFixedDelay(execute, delay, interval, unit);
return () -> future.cancel(false);
}
}

View File

@@ -20,6 +20,7 @@ import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.TaskScheduler.Cancellable;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.briarproject.bramble.test.ImmediateExecutor;
import org.briarproject.bramble.test.RunAction;
@@ -31,7 +32,6 @@ import org.junit.Test;
import java.security.SecureRandom;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
@@ -55,7 +55,7 @@ public class PollerImplTest extends BrambleMockTestCase {
private final TransportPropertyManager transportPropertyManager =
context.mock(TransportPropertyManager.class);
private final Clock clock = context.mock(Clock.class);
private final Future<?> future = context.mock(Future.class);
private final Cancellable cancellable = context.mock(Cancellable.class);
private final SecureRandom random;
private final Executor ioExecutor = new ImmediateExecutor();
@@ -237,7 +237,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
}});
poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
@@ -266,7 +266,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// Second event
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
@@ -309,7 +309,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// Second event
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
@@ -322,7 +322,7 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(pollingInterval - 2));
oneOf(clock).currentTimeMillis();
will(returnValue(now + 1));
oneOf(future).cancel(false);
oneOf(cancellable).cancel();
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval - 2),
with(MILLISECONDS));
@@ -352,7 +352,7 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
will(new RunAction());
// Running the polling task schedules the next polling task
oneOf(plugin).getPollingInterval();
@@ -364,7 +364,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) (pollingInterval * 0.5)),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// Get the transport properties and connected contacts
oneOf(transportPropertyManager).getRemoteProperties(transportId);
will(returnValue(singletonMap(contactId, properties)));
@@ -396,7 +396,7 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
will(new RunAction());
// Running the polling task schedules the next polling task
oneOf(plugin).getPollingInterval();
@@ -408,7 +408,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) (pollingInterval * 0.5)),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// Get the transport properties and connected contacts
oneOf(transportPropertyManager).getRemoteProperties(transportId);
will(returnValue(singletonMap(contactId, properties)));
@@ -438,9 +438,9 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// The plugin is deactivated before the task runs - cancel the task
oneOf(future).cancel(false);
oneOf(cancellable).cancel();
}});
poller.eventOccurred(new TransportActiveEvent(transportId));
@@ -463,7 +463,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
}});
}