Fix cancellation of periodic tasks, remove ticker.

This commit is contained in:
akwizgran
2020-08-13 11:38:23 +01:00
parent 0e2d905486
commit c37fe2a246
7 changed files with 129 additions and 75 deletions

View File

@@ -17,9 +17,9 @@ import org.briarproject.bramble.api.network.event.NetworkStatusEvent;
import org.briarproject.bramble.api.nullsafety.MethodsNotNullByDefault;
import org.briarproject.bramble.api.nullsafety.ParametersNotNullByDefault;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.TaskScheduler.Cancellable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -55,7 +55,7 @@ class AndroidNetworkManager implements NetworkManager, Service {
private final EventBus eventBus;
private final Executor eventExecutor;
private final Context appContext;
private final AtomicReference<Future<?>> connectivityCheck =
private final AtomicReference<Cancellable> connectivityCheck =
new AtomicReference<>();
private final AtomicBoolean used = new AtomicBoolean(false);
@@ -107,12 +107,12 @@ class AndroidNetworkManager implements NetworkManager, Service {
}
private void scheduleConnectionStatusUpdate(int delay, TimeUnit unit) {
Future<?> newConnectivityCheck =
Cancellable newConnectivityCheck =
scheduler.schedule(this::updateConnectionStatus, eventExecutor,
delay, unit);
Future<?> oldConnectivityCheck =
Cancellable oldConnectivityCheck =
connectivityCheck.getAndSet(newConnectivityCheck);
if (oldConnectivityCheck != null) oldConnectivityCheck.cancel(false);
if (oldConnectivityCheck != null) oldConnectivityCheck.cancel();
}
private class NetworkStateReceiver extends BroadcastReceiver {

View File

@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.system.AlarmListener;
import org.briarproject.bramble.api.system.AndroidWakeLockManager;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.Wakeful;
import java.util.ArrayList;
import java.util.List;
@@ -20,9 +21,9 @@ import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
@@ -35,7 +36,6 @@ import static android.content.Context.ALARM_SERVICE;
import static android.os.Build.VERSION.SDK_INT;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.system.AlarmConstants.EXTRA_PID;
@@ -48,7 +48,6 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener {
private static final Logger LOG =
getLogger(AndroidTaskScheduler.class.getName());
private static final long TICK_MS = SECONDS.toMillis(10);
private static final long ALARM_MS = INTERVAL_FIFTEEN_MINUTES;
private final Application app;
@@ -72,9 +71,6 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener {
@Override
public void startService() {
scheduledExecutorService.scheduleAtFixedRate(
() -> wakeLockManager.runWakefully(this::runDueTasks,
"TaskTicker"), TICK_MS, TICK_MS, MILLISECONDS);
scheduleAlarm();
}
@@ -84,31 +80,18 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener {
}
@Override
public Future<?> schedule(Runnable task, Executor executor, long delay,
public Cancellable schedule(Runnable task, Executor executor, long delay,
TimeUnit unit) {
long now = SystemClock.elapsedRealtime();
long dueMillis = now + MILLISECONDS.convert(delay, unit);
Runnable wakeful = () ->
wakeLockManager.executeWakefully(task, executor, "TaskHandoff");
ScheduledTask s = new ScheduledTask(wakeful, dueMillis);
if (dueMillis <= now) {
scheduledExecutorService.execute(s);
} else {
synchronized (lock) {
tasks.add(s);
}
}
return s;
AtomicBoolean cancelled = new AtomicBoolean(false);
return schedule(task, executor, delay, unit, cancelled);
}
@Override
public Future<?> scheduleWithFixedDelay(Runnable task, Executor executor,
public Cancellable scheduleWithFixedDelay(Runnable task, Executor executor,
long delay, long interval, TimeUnit unit) {
Runnable wrapped = () -> {
task.run();
scheduleWithFixedDelay(task, executor, interval, interval, unit);
};
return schedule(wrapped, executor, delay, unit);
AtomicBoolean cancelled = new AtomicBoolean(false);
return scheduleWithFixedDelay(task, executor, delay, interval, unit,
cancelled);
}
@Override
@@ -127,6 +110,39 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener {
}, "TaskAlarm");
}
private Cancellable schedule(Runnable task, Executor executor, long delay,
TimeUnit unit, AtomicBoolean cancelled) {
long now = SystemClock.elapsedRealtime();
long dueMillis = now + MILLISECONDS.convert(delay, unit);
Runnable wakeful = () ->
wakeLockManager.executeWakefully(task, executor, "TaskHandoff");
Future<?> check = scheduleCheckForDueTasks(delay, unit);
ScheduledTask s = new ScheduledTask(wakeful, dueMillis, check,
cancelled);
synchronized (lock) {
tasks.add(s);
}
return s;
}
private Cancellable scheduleWithFixedDelay(Runnable task, Executor executor,
long delay, long interval, TimeUnit unit, AtomicBoolean cancelled) {
// All executions of this periodic task share a cancelled flag
Runnable wrapped = () -> {
task.run();
scheduleWithFixedDelay(task, executor, interval, interval, unit,
cancelled);
};
return schedule(wrapped, executor, delay, unit, cancelled);
}
private Future<?> scheduleCheckForDueTasks(long delay, TimeUnit unit) {
Runnable wakeful = () -> wakeLockManager.runWakefully(
this::runDueTasks, "TaskScheduler");
return scheduledExecutorService.schedule(wakeful, delay, unit);
}
@Wakeful
private void runDueTasks() {
long now = SystemClock.elapsedRealtime();
List<ScheduledTask> due = new ArrayList<>();
@@ -182,14 +198,37 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener {
FLAG_CANCEL_CURRENT);
}
private static class ScheduledTask extends FutureTask<Void>
implements Comparable<ScheduledTask> {
private class ScheduledTask
implements Runnable, Cancellable, Comparable<ScheduledTask> {
private final Runnable task;
private final long dueMillis;
private final Future<?> check;
private final AtomicBoolean cancelled;
public ScheduledTask(Runnable runnable, long dueMillis) {
super(runnable, null);
public ScheduledTask(Runnable task, long dueMillis,
Future<?> check, AtomicBoolean cancelled) {
this.task = task;
this.dueMillis = dueMillis;
this.check = check;
this.cancelled = cancelled;
}
@Override
public void run() {
if (!cancelled.get()) task.run();
}
@Override
public void cancel() {
// Cancel any future executions of this task
cancelled.set(true);
// Cancel the scheduled check for due tasks
check.cancel(false);
// Remove the task from the queue
synchronized (lock) {
tasks.remove(this);
}
}
@Override

View File

@@ -3,7 +3,6 @@ package org.briarproject.bramble.api.system;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@@ -18,7 +17,7 @@ public interface TaskScheduler {
* If the platform supports wake locks, a wake lock will be held while
* submitting and running the task.
*/
Future<?> schedule(Runnable task, Executor executor, long delay,
Cancellable schedule(Runnable task, Executor executor, long delay,
TimeUnit unit);
/**
@@ -29,6 +28,16 @@ public interface TaskScheduler {
* If the platform supports wake locks, a wake lock will be held while
* submitting and running the task.
*/
Future<?> scheduleWithFixedDelay(Runnable task, Executor executor,
Cancellable scheduleWithFixedDelay(Runnable task, Executor executor,
long delay, long interval, TimeUnit unit);
interface Cancellable {
/**
* Cancels the task if it has not already started running. If the task
* is {@link #scheduleWithFixedDelay(Runnable, Executor, long, long, TimeUnit) periodic},
* all future executions of the task are cancelled.
*/
void cancel();
}
}

View File

@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.io.TimeoutMonitor;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.TaskScheduler.Cancellable;
import org.briarproject.bramble.api.system.Wakeful;
import java.io.IOException;
@@ -11,7 +12,6 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
@@ -38,7 +38,7 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
private final List<TimeoutInputStream> streams = new ArrayList<>();
@GuardedBy("lock")
private Future<?> task = null;
private Cancellable cancellable = null;
@Inject
TimeoutMonitorImpl(TaskScheduler scheduler,
@@ -55,9 +55,9 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
timeoutMs, this::removeStream);
synchronized (lock) {
if (streams.isEmpty()) {
task = scheduler.scheduleWithFixedDelay(this::checkTimeouts,
ioExecutor, CHECK_INTERVAL_MS, CHECK_INTERVAL_MS,
MILLISECONDS);
cancellable = scheduler.scheduleWithFixedDelay(
this::checkTimeouts, ioExecutor, CHECK_INTERVAL_MS,
CHECK_INTERVAL_MS, MILLISECONDS);
}
streams.add(stream);
}
@@ -65,14 +65,17 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
}
private void removeStream(TimeoutInputStream stream) {
Future<?> toCancel = null;
Cancellable toCancel = null;
synchronized (lock) {
if (streams.remove(stream) && streams.isEmpty()) {
toCancel = task;
task = null;
toCancel = cancellable;
cancellable = null;
}
}
if (toCancel != null) toCancel.cancel(false);
if (toCancel != null) {
LOG.info("Cancelling timeout monitor task");
toCancel.cancel();
}
}
@IoExecutor

View File

@@ -27,6 +27,7 @@ import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.TaskScheduler.Cancellable;
import org.briarproject.bramble.api.system.Wakeful;
import org.briarproject.bramble.api.system.WakefulIoExecutor;
@@ -37,7 +38,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
@@ -191,11 +191,11 @@ class PollerImpl implements Poller, EventListener {
if (scheduled == null || due < scheduled.task.due) {
// If a later task exists, cancel it. If it's already started
// it will abort safely when it finds it's been replaced
if (scheduled != null) scheduled.future.cancel(false);
if (scheduled != null) scheduled.cancellable.cancel();
PollTask task = new PollTask(p, due, randomiseNext);
Future<?> future = scheduler.schedule(task, ioExecutor, delay,
MILLISECONDS);
tasks.put(t, new ScheduledPollTask(task, future));
Cancellable cancellable = scheduler.schedule(task, ioExecutor,
delay, MILLISECONDS);
tasks.put(t, new ScheduledPollTask(task, cancellable));
}
} finally {
lock.unlock();
@@ -206,7 +206,7 @@ class PollerImpl implements Poller, EventListener {
lock.lock();
try {
ScheduledPollTask scheduled = tasks.remove(t);
if (scheduled != null) scheduled.future.cancel(false);
if (scheduled != null) scheduled.cancellable.cancel();
} finally {
lock.unlock();
}
@@ -237,11 +237,11 @@ class PollerImpl implements Poller, EventListener {
private class ScheduledPollTask {
private final PollTask task;
private final Future<?> future;
private final Cancellable cancellable;
private ScheduledPollTask(PollTask task, Future<?> future) {
private ScheduledPollTask(PollTask task, Cancellable cancellable) {
this.task = task;
this.future = future;
this.cancellable = cancellable;
}
}

View File

@@ -4,8 +4,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.system.TaskScheduler;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
@@ -24,17 +24,20 @@ class TaskSchedulerImpl implements TaskScheduler {
}
@Override
public Future<?> schedule(Runnable task, Executor executor, long delay,
public Cancellable schedule(Runnable task, Executor executor, long delay,
TimeUnit unit) {
Runnable execute = () -> executor.execute(task);
return scheduledExecutorService.schedule(execute, delay, unit);
ScheduledFuture<?> future =
scheduledExecutorService.schedule(execute, delay, unit);
return () -> future.cancel(false);
}
@Override
public Future<?> scheduleWithFixedDelay(Runnable task, Executor executor,
public Cancellable scheduleWithFixedDelay(Runnable task, Executor executor,
long delay, long interval, TimeUnit unit) {
Runnable execute = () -> executor.execute(task);
return scheduledExecutorService.scheduleWithFixedDelay(execute, delay,
interval, unit);
ScheduledFuture<?> future = scheduledExecutorService.
scheduleWithFixedDelay(execute, delay, interval, unit);
return () -> future.cancel(false);
}
}

View File

@@ -20,6 +20,7 @@ import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.TaskScheduler.Cancellable;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.briarproject.bramble.test.ImmediateExecutor;
import org.briarproject.bramble.test.RunAction;
@@ -31,7 +32,6 @@ import org.junit.Test;
import java.security.SecureRandom;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
@@ -55,7 +55,7 @@ public class PollerImplTest extends BrambleMockTestCase {
private final TransportPropertyManager transportPropertyManager =
context.mock(TransportPropertyManager.class);
private final Clock clock = context.mock(Clock.class);
private final Future<?> future = context.mock(Future.class);
private final Cancellable cancellable = context.mock(Cancellable.class);
private final SecureRandom random;
private final Executor ioExecutor = new ImmediateExecutor();
@@ -237,7 +237,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
}});
poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
@@ -266,7 +266,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// Second event
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
@@ -309,7 +309,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// Second event
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
@@ -322,7 +322,7 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(pollingInterval - 2));
oneOf(clock).currentTimeMillis();
will(returnValue(now + 1));
oneOf(future).cancel(false);
oneOf(cancellable).cancel();
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval - 2),
with(MILLISECONDS));
@@ -352,7 +352,7 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
will(new RunAction());
// Running the polling task schedules the next polling task
oneOf(plugin).getPollingInterval();
@@ -364,7 +364,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) (pollingInterval * 0.5)),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// Get the transport properties and connected contacts
oneOf(transportPropertyManager).getRemoteProperties(transportId);
will(returnValue(singletonMap(contactId, properties)));
@@ -396,7 +396,7 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
will(new RunAction());
// Running the polling task schedules the next polling task
oneOf(plugin).getPollingInterval();
@@ -408,7 +408,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) (pollingInterval * 0.5)),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// Get the transport properties and connected contacts
oneOf(transportPropertyManager).getRemoteProperties(transportId);
will(returnValue(singletonMap(contactId, properties)));
@@ -438,9 +438,9 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
// The plugin is deactivated before the task runs - cancel the task
oneOf(future).cancel(false);
oneOf(cancellable).cancel();
}});
poller.eventOccurred(new TransportActiveEvent(transportId));
@@ -463,7 +463,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval),
with(MILLISECONDS));
will(returnValue(future));
will(returnValue(cancellable));
}});
}