diff --git a/bramble-android/src/main/java/org/briarproject/bramble/network/AndroidNetworkManager.java b/bramble-android/src/main/java/org/briarproject/bramble/network/AndroidNetworkManager.java index f6c238600..3ae99173a 100644 --- a/bramble-android/src/main/java/org/briarproject/bramble/network/AndroidNetworkManager.java +++ b/bramble-android/src/main/java/org/briarproject/bramble/network/AndroidNetworkManager.java @@ -17,9 +17,9 @@ import org.briarproject.bramble.api.network.event.NetworkStatusEvent; import org.briarproject.bramble.api.nullsafety.MethodsNotNullByDefault; import org.briarproject.bramble.api.nullsafety.ParametersNotNullByDefault; import org.briarproject.bramble.api.system.TaskScheduler; +import org.briarproject.bramble.api.system.TaskScheduler.Cancellable; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -55,7 +55,7 @@ class AndroidNetworkManager implements NetworkManager, Service { private final EventBus eventBus; private final Executor eventExecutor; private final Context appContext; - private final AtomicReference> connectivityCheck = + private final AtomicReference connectivityCheck = new AtomicReference<>(); private final AtomicBoolean used = new AtomicBoolean(false); @@ -107,12 +107,12 @@ class AndroidNetworkManager implements NetworkManager, Service { } private void scheduleConnectionStatusUpdate(int delay, TimeUnit unit) { - Future newConnectivityCheck = + Cancellable newConnectivityCheck = scheduler.schedule(this::updateConnectionStatus, eventExecutor, delay, unit); - Future oldConnectivityCheck = + Cancellable oldConnectivityCheck = connectivityCheck.getAndSet(newConnectivityCheck); - if (oldConnectivityCheck != null) oldConnectivityCheck.cancel(false); + if (oldConnectivityCheck != null) oldConnectivityCheck.cancel(); } private class NetworkStateReceiver extends BroadcastReceiver { diff --git a/bramble-android/src/main/java/org/briarproject/bramble/system/AndroidTaskScheduler.java b/bramble-android/src/main/java/org/briarproject/bramble/system/AndroidTaskScheduler.java index 55714cb00..639824991 100644 --- a/bramble-android/src/main/java/org/briarproject/bramble/system/AndroidTaskScheduler.java +++ b/bramble-android/src/main/java/org/briarproject/bramble/system/AndroidTaskScheduler.java @@ -13,6 +13,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.system.AlarmListener; import org.briarproject.bramble.api.system.AndroidWakeLockManager; import org.briarproject.bramble.api.system.TaskScheduler; +import org.briarproject.bramble.api.system.Wakeful; import java.util.ArrayList; import java.util.List; @@ -20,9 +21,9 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; @@ -35,7 +36,6 @@ import static android.content.Context.ALARM_SERVICE; import static android.os.Build.VERSION.SDK_INT; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.logging.Level.INFO; import static java.util.logging.Logger.getLogger; import static org.briarproject.bramble.system.AlarmConstants.EXTRA_PID; @@ -48,7 +48,6 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener { private static final Logger LOG = getLogger(AndroidTaskScheduler.class.getName()); - private static final long TICK_MS = SECONDS.toMillis(10); private static final long ALARM_MS = INTERVAL_FIFTEEN_MINUTES; private final Application app; @@ -72,9 +71,6 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener { @Override public void startService() { - scheduledExecutorService.scheduleAtFixedRate( - () -> wakeLockManager.runWakefully(this::runDueTasks, - "TaskTicker"), TICK_MS, TICK_MS, MILLISECONDS); scheduleAlarm(); } @@ -84,31 +80,18 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener { } @Override - public Future schedule(Runnable task, Executor executor, long delay, + public Cancellable schedule(Runnable task, Executor executor, long delay, TimeUnit unit) { - long now = SystemClock.elapsedRealtime(); - long dueMillis = now + MILLISECONDS.convert(delay, unit); - Runnable wakeful = () -> - wakeLockManager.executeWakefully(task, executor, "TaskHandoff"); - ScheduledTask s = new ScheduledTask(wakeful, dueMillis); - if (dueMillis <= now) { - scheduledExecutorService.execute(s); - } else { - synchronized (lock) { - tasks.add(s); - } - } - return s; + AtomicBoolean cancelled = new AtomicBoolean(false); + return schedule(task, executor, delay, unit, cancelled); } @Override - public Future scheduleWithFixedDelay(Runnable task, Executor executor, + public Cancellable scheduleWithFixedDelay(Runnable task, Executor executor, long delay, long interval, TimeUnit unit) { - Runnable wrapped = () -> { - task.run(); - scheduleWithFixedDelay(task, executor, interval, interval, unit); - }; - return schedule(wrapped, executor, delay, unit); + AtomicBoolean cancelled = new AtomicBoolean(false); + return scheduleWithFixedDelay(task, executor, delay, interval, unit, + cancelled); } @Override @@ -127,6 +110,39 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener { }, "TaskAlarm"); } + private Cancellable schedule(Runnable task, Executor executor, long delay, + TimeUnit unit, AtomicBoolean cancelled) { + long now = SystemClock.elapsedRealtime(); + long dueMillis = now + MILLISECONDS.convert(delay, unit); + Runnable wakeful = () -> + wakeLockManager.executeWakefully(task, executor, "TaskHandoff"); + Future check = scheduleCheckForDueTasks(delay, unit); + ScheduledTask s = new ScheduledTask(wakeful, dueMillis, check, + cancelled); + synchronized (lock) { + tasks.add(s); + } + return s; + } + + private Cancellable scheduleWithFixedDelay(Runnable task, Executor executor, + long delay, long interval, TimeUnit unit, AtomicBoolean cancelled) { + // All executions of this periodic task share a cancelled flag + Runnable wrapped = () -> { + task.run(); + scheduleWithFixedDelay(task, executor, interval, interval, unit, + cancelled); + }; + return schedule(wrapped, executor, delay, unit, cancelled); + } + + private Future scheduleCheckForDueTasks(long delay, TimeUnit unit) { + Runnable wakeful = () -> wakeLockManager.runWakefully( + this::runDueTasks, "TaskScheduler"); + return scheduledExecutorService.schedule(wakeful, delay, unit); + } + + @Wakeful private void runDueTasks() { long now = SystemClock.elapsedRealtime(); List due = new ArrayList<>(); @@ -182,14 +198,37 @@ class AndroidTaskScheduler implements TaskScheduler, Service, AlarmListener { FLAG_CANCEL_CURRENT); } - private static class ScheduledTask extends FutureTask - implements Comparable { + private class ScheduledTask + implements Runnable, Cancellable, Comparable { + private final Runnable task; private final long dueMillis; + private final Future check; + private final AtomicBoolean cancelled; - public ScheduledTask(Runnable runnable, long dueMillis) { - super(runnable, null); + public ScheduledTask(Runnable task, long dueMillis, + Future check, AtomicBoolean cancelled) { + this.task = task; this.dueMillis = dueMillis; + this.check = check; + this.cancelled = cancelled; + } + + @Override + public void run() { + if (!cancelled.get()) task.run(); + } + + @Override + public void cancel() { + // Cancel any future executions of this task + cancelled.set(true); + // Cancel the scheduled check for due tasks + check.cancel(false); + // Remove the task from the queue + synchronized (lock) { + tasks.remove(this); + } } @Override diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/system/TaskScheduler.java b/bramble-api/src/main/java/org/briarproject/bramble/api/system/TaskScheduler.java index 7e1490cfa..de1b85e8a 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/system/TaskScheduler.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/system/TaskScheduler.java @@ -3,7 +3,6 @@ package org.briarproject.bramble.api.system; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** @@ -18,7 +17,7 @@ public interface TaskScheduler { * If the platform supports wake locks, a wake lock will be held while * submitting and running the task. */ - Future schedule(Runnable task, Executor executor, long delay, + Cancellable schedule(Runnable task, Executor executor, long delay, TimeUnit unit); /** @@ -29,6 +28,16 @@ public interface TaskScheduler { * If the platform supports wake locks, a wake lock will be held while * submitting and running the task. */ - Future scheduleWithFixedDelay(Runnable task, Executor executor, + Cancellable scheduleWithFixedDelay(Runnable task, Executor executor, long delay, long interval, TimeUnit unit); + + interface Cancellable { + + /** + * Cancels the task if it has not already started running. If the task + * is {@link #scheduleWithFixedDelay(Runnable, Executor, long, long, TimeUnit) periodic}, + * all future executions of the task are cancelled. + */ + void cancel(); + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/io/TimeoutMonitorImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/io/TimeoutMonitorImpl.java index 825ddc9da..d080cabeb 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/io/TimeoutMonitorImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/io/TimeoutMonitorImpl.java @@ -4,6 +4,7 @@ import org.briarproject.bramble.api.io.TimeoutMonitor; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.api.system.TaskScheduler; +import org.briarproject.bramble.api.system.TaskScheduler.Cancellable; import org.briarproject.bramble.api.system.Wakeful; import java.io.IOException; @@ -11,7 +12,6 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; @@ -38,7 +38,7 @@ class TimeoutMonitorImpl implements TimeoutMonitor { private final List streams = new ArrayList<>(); @GuardedBy("lock") - private Future task = null; + private Cancellable cancellable = null; @Inject TimeoutMonitorImpl(TaskScheduler scheduler, @@ -55,9 +55,9 @@ class TimeoutMonitorImpl implements TimeoutMonitor { timeoutMs, this::removeStream); synchronized (lock) { if (streams.isEmpty()) { - task = scheduler.scheduleWithFixedDelay(this::checkTimeouts, - ioExecutor, CHECK_INTERVAL_MS, CHECK_INTERVAL_MS, - MILLISECONDS); + cancellable = scheduler.scheduleWithFixedDelay( + this::checkTimeouts, ioExecutor, CHECK_INTERVAL_MS, + CHECK_INTERVAL_MS, MILLISECONDS); } streams.add(stream); } @@ -65,14 +65,17 @@ class TimeoutMonitorImpl implements TimeoutMonitor { } private void removeStream(TimeoutInputStream stream) { - Future toCancel = null; + Cancellable toCancel = null; synchronized (lock) { if (streams.remove(stream) && streams.isEmpty()) { - toCancel = task; - task = null; + toCancel = cancellable; + cancellable = null; } } - if (toCancel != null) toCancel.cancel(false); + if (toCancel != null) { + LOG.info("Cancelling timeout monitor task"); + toCancel.cancel(); + } } @IoExecutor diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java index e687aa6aa..4148d15f2 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/PollerImpl.java @@ -27,6 +27,7 @@ import org.briarproject.bramble.api.properties.TransportProperties; import org.briarproject.bramble.api.properties.TransportPropertyManager; import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.api.system.TaskScheduler; +import org.briarproject.bramble.api.system.TaskScheduler.Cancellable; import org.briarproject.bramble.api.system.Wakeful; import org.briarproject.bramble.api.system.WakefulIoExecutor; @@ -37,7 +38,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; @@ -191,11 +191,11 @@ class PollerImpl implements Poller, EventListener { if (scheduled == null || due < scheduled.task.due) { // If a later task exists, cancel it. If it's already started // it will abort safely when it finds it's been replaced - if (scheduled != null) scheduled.future.cancel(false); + if (scheduled != null) scheduled.cancellable.cancel(); PollTask task = new PollTask(p, due, randomiseNext); - Future future = scheduler.schedule(task, ioExecutor, delay, - MILLISECONDS); - tasks.put(t, new ScheduledPollTask(task, future)); + Cancellable cancellable = scheduler.schedule(task, ioExecutor, + delay, MILLISECONDS); + tasks.put(t, new ScheduledPollTask(task, cancellable)); } } finally { lock.unlock(); @@ -206,7 +206,7 @@ class PollerImpl implements Poller, EventListener { lock.lock(); try { ScheduledPollTask scheduled = tasks.remove(t); - if (scheduled != null) scheduled.future.cancel(false); + if (scheduled != null) scheduled.cancellable.cancel(); } finally { lock.unlock(); } @@ -237,11 +237,11 @@ class PollerImpl implements Poller, EventListener { private class ScheduledPollTask { private final PollTask task; - private final Future future; + private final Cancellable cancellable; - private ScheduledPollTask(PollTask task, Future future) { + private ScheduledPollTask(PollTask task, Cancellable cancellable) { this.task = task; - this.future = future; + this.cancellable = cancellable; } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/system/TaskSchedulerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/system/TaskSchedulerImpl.java index 629f2ecd2..ef797a28c 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/system/TaskSchedulerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/system/TaskSchedulerImpl.java @@ -4,8 +4,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.system.TaskScheduler; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.ThreadSafe; @@ -24,17 +24,20 @@ class TaskSchedulerImpl implements TaskScheduler { } @Override - public Future schedule(Runnable task, Executor executor, long delay, + public Cancellable schedule(Runnable task, Executor executor, long delay, TimeUnit unit) { Runnable execute = () -> executor.execute(task); - return scheduledExecutorService.schedule(execute, delay, unit); + ScheduledFuture future = + scheduledExecutorService.schedule(execute, delay, unit); + return () -> future.cancel(false); } @Override - public Future scheduleWithFixedDelay(Runnable task, Executor executor, + public Cancellable scheduleWithFixedDelay(Runnable task, Executor executor, long delay, long interval, TimeUnit unit) { Runnable execute = () -> executor.execute(task); - return scheduledExecutorService.scheduleWithFixedDelay(execute, delay, - interval, unit); + ScheduledFuture future = scheduledExecutorService. + scheduleWithFixedDelay(execute, delay, interval, unit); + return () -> future.cancel(false); } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java index 2a93fd39e..6c85747ac 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerImplTest.java @@ -20,6 +20,7 @@ import org.briarproject.bramble.api.properties.TransportProperties; import org.briarproject.bramble.api.properties.TransportPropertyManager; import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.api.system.TaskScheduler; +import org.briarproject.bramble.api.system.TaskScheduler.Cancellable; import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.ImmediateExecutor; import org.briarproject.bramble.test.RunAction; @@ -31,7 +32,6 @@ import org.junit.Test; import java.security.SecureRandom; import java.util.List; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; @@ -55,7 +55,7 @@ public class PollerImplTest extends BrambleMockTestCase { private final TransportPropertyManager transportPropertyManager = context.mock(TransportPropertyManager.class); private final Clock clock = context.mock(Clock.class); - private final Future future = context.mock(Future.class); + private final Cancellable cancellable = context.mock(Cancellable.class); private final SecureRandom random; private final Executor ioExecutor = new ImmediateExecutor(); @@ -237,7 +237,7 @@ public class PollerImplTest extends BrambleMockTestCase { oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with((long) pollingInterval), with(MILLISECONDS)); - will(returnValue(future)); + will(returnValue(cancellable)); }}); poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, @@ -266,7 +266,7 @@ public class PollerImplTest extends BrambleMockTestCase { oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with((long) pollingInterval), with(MILLISECONDS)); - will(returnValue(future)); + will(returnValue(cancellable)); // Second event // Get the plugin oneOf(pluginManager).getPlugin(transportId); @@ -309,7 +309,7 @@ public class PollerImplTest extends BrambleMockTestCase { oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with((long) pollingInterval), with(MILLISECONDS)); - will(returnValue(future)); + will(returnValue(cancellable)); // Second event // Get the plugin oneOf(pluginManager).getPlugin(transportId); @@ -322,7 +322,7 @@ public class PollerImplTest extends BrambleMockTestCase { will(returnValue(pollingInterval - 2)); oneOf(clock).currentTimeMillis(); will(returnValue(now + 1)); - oneOf(future).cancel(false); + oneOf(cancellable).cancel(); oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with((long) pollingInterval - 2), with(MILLISECONDS)); @@ -352,7 +352,7 @@ public class PollerImplTest extends BrambleMockTestCase { will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with(0L), with(MILLISECONDS)); - will(returnValue(future)); + will(returnValue(cancellable)); will(new RunAction()); // Running the polling task schedules the next polling task oneOf(plugin).getPollingInterval(); @@ -364,7 +364,7 @@ public class PollerImplTest extends BrambleMockTestCase { oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with((long) (pollingInterval * 0.5)), with(MILLISECONDS)); - will(returnValue(future)); + will(returnValue(cancellable)); // Get the transport properties and connected contacts oneOf(transportPropertyManager).getRemoteProperties(transportId); will(returnValue(singletonMap(contactId, properties))); @@ -396,7 +396,7 @@ public class PollerImplTest extends BrambleMockTestCase { will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with(0L), with(MILLISECONDS)); - will(returnValue(future)); + will(returnValue(cancellable)); will(new RunAction()); // Running the polling task schedules the next polling task oneOf(plugin).getPollingInterval(); @@ -408,7 +408,7 @@ public class PollerImplTest extends BrambleMockTestCase { oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with((long) (pollingInterval * 0.5)), with(MILLISECONDS)); - will(returnValue(future)); + will(returnValue(cancellable)); // Get the transport properties and connected contacts oneOf(transportPropertyManager).getRemoteProperties(transportId); will(returnValue(singletonMap(contactId, properties))); @@ -438,9 +438,9 @@ public class PollerImplTest extends BrambleMockTestCase { will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with(0L), with(MILLISECONDS)); - will(returnValue(future)); + will(returnValue(cancellable)); // The plugin is deactivated before the task runs - cancel the task - oneOf(future).cancel(false); + oneOf(cancellable).cancel(); }}); poller.eventOccurred(new TransportActiveEvent(transportId)); @@ -463,7 +463,7 @@ public class PollerImplTest extends BrambleMockTestCase { oneOf(scheduler).schedule(with(any(Runnable.class)), with(ioExecutor), with((long) pollingInterval), with(MILLISECONDS)); - will(returnValue(future)); + will(returnValue(cancellable)); }}); }