diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitor.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitor.java new file mode 100644 index 000000000..c6b86d88a --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitor.java @@ -0,0 +1,45 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.Plugin; + +import javax.annotation.concurrent.ThreadSafe; + +import static java.util.concurrent.TimeUnit.MINUTES; + +@ThreadSafe +@NotNullByDefault +interface TorReachabilityMonitor { + + /** + * How long the Tor plugin needs to be continuously + * {@link Plugin.State#ACTIVE active} before we assume our contacts can + * reach our hidden service. + */ + long REACHABILITY_PERIOD_MS = MINUTES.toMillis(10); + + /** + * Starts the monitor. + */ + void start(); + + /** + * Destroys the monitor. + */ + void destroy(); + + /** + * Adds an observer that will be called when our Tor hidden service becomes + * reachable. If our hidden service is already reachable, the observer is + * called immediately. + *

+ * Observers are removed after being called, or when the monitor is + * {@link #destroy() destroyed}. + */ + void addOneShotObserver(TorReachabilityObserver o); + + interface TorReachabilityObserver { + + void onTorReachable(); + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImpl.java new file mode 100644 index 000000000..fa16682b3 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImpl.java @@ -0,0 +1,129 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.Cancellable; +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.event.EventListener; +import org.briarproject.bramble.api.lifecycle.IoExecutor; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.Plugin; +import org.briarproject.bramble.api.plugin.PluginManager; +import org.briarproject.bramble.api.plugin.event.TransportActiveEvent; +import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent; +import org.briarproject.bramble.api.system.TaskScheduler; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.briarproject.bramble.api.plugin.Plugin.State.ACTIVE; +import static org.briarproject.bramble.api.plugin.TorConstants.ID; + +@ThreadSafe +@NotNullByDefault +class TorReachabilityMonitorImpl + implements TorReachabilityMonitor, EventListener { + + private final Executor ioExecutor; + private final TaskScheduler taskScheduler; + private final PluginManager pluginManager; + private final EventBus eventBus; + private final Object lock = new Object(); + + @GuardedBy("lock") + private boolean reachable = false, destroyed = false; + + @GuardedBy("lock") + private final List observers = new ArrayList<>(); + + @GuardedBy("lock") + @Nullable + private Cancellable task = null; + + @Inject + TorReachabilityMonitorImpl( + @IoExecutor Executor ioExecutor, + TaskScheduler taskScheduler, + PluginManager pluginManager, + EventBus eventBus) { + this.ioExecutor = ioExecutor; + this.taskScheduler = taskScheduler; + this.pluginManager = pluginManager; + this.eventBus = eventBus; + } + + @Override + public void start() { + eventBus.addListener(this); + Plugin plugin = pluginManager.getPlugin(ID); + if (plugin != null && plugin.getState() == ACTIVE) onTorActive(); + } + + @Override + public void destroy() { + eventBus.removeListener(this); + synchronized (lock) { + destroyed = true; + if (task != null) task.cancel(); + task = null; + observers.clear(); + } + } + + @Override + public void addOneShotObserver(TorReachabilityObserver o) { + boolean callNow = false; + synchronized (lock) { + if (destroyed) return; + if (reachable) callNow = true; + else observers.add(o); + } + if (callNow) o.onTorReachable(); + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof TransportActiveEvent) { + TransportActiveEvent t = (TransportActiveEvent) e; + if (t.getTransportId().equals(ID)) onTorActive(); + } else if (e instanceof TransportInactiveEvent) { + TransportInactiveEvent t = (TransportInactiveEvent) e; + if (t.getTransportId().equals(ID)) onTorInactive(); + } + } + + private void onTorActive() { + synchronized (lock) { + if (destroyed || task != null) return; + task = taskScheduler.schedule(this::onTorReachable, ioExecutor, + REACHABILITY_PERIOD_MS, MILLISECONDS); + } + } + + private void onTorInactive() { + synchronized (lock) { + reachable = false; + if (task != null) task.cancel(); + task = null; + } + } + + @IoExecutor + private void onTorReachable() { + List observers; + synchronized (lock) { + if (destroyed) return; + reachable = true; + observers = new ArrayList<>(this.observers); + this.observers.clear(); + task = null; + } + for (TorReachabilityObserver o : observers) o.onTorReachable(); + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImplTest.java new file mode 100644 index 000000000..54e30db85 --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImplTest.java @@ -0,0 +1,246 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.Cancellable; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.plugin.Plugin; +import org.briarproject.bramble.api.plugin.PluginManager; +import org.briarproject.bramble.api.plugin.event.TransportActiveEvent; +import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent; +import org.briarproject.bramble.api.system.TaskScheduler; +import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver; +import org.briarproject.bramble.test.BrambleMockTestCase; +import org.briarproject.bramble.test.CaptureArgumentAction; +import org.jmock.Expectations; +import org.jmock.lib.action.DoAllAction; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.briarproject.bramble.api.plugin.Plugin.State.ACTIVE; +import static org.briarproject.bramble.api.plugin.Plugin.State.DISABLED; +import static org.briarproject.bramble.api.plugin.Plugin.State.ENABLING; +import static org.briarproject.bramble.api.plugin.TorConstants.ID; +import static org.briarproject.bramble.mailbox.TorReachabilityMonitor.REACHABILITY_PERIOD_MS; + +public class TorReachabilityMonitorImplTest extends BrambleMockTestCase { + + private final Executor ioExecutor = context.mock(Executor.class); + private final TaskScheduler taskScheduler = + context.mock(TaskScheduler.class); + private final PluginManager pluginManager = + context.mock(PluginManager.class); + private final EventBus eventBus = context.mock(EventBus.class); + private final Plugin plugin = context.mock(Plugin.class); + private final Cancellable scheduledTask = context.mock(Cancellable.class); + private final TorReachabilityObserver observer = + context.mock(TorReachabilityObserver.class); + + private TorReachabilityMonitorImpl monitor; + + @Before + public void setUp() { + monitor = new TorReachabilityMonitorImpl(ioExecutor, taskScheduler, + pluginManager, eventBus); + } + + @Test + public void testSchedulesTaskWhenStartedIfTorIsActive() { + // Starting the monitor should schedule a task + context.checking(new Expectations() {{ + oneOf(eventBus).addListener(monitor); + oneOf(pluginManager).getPlugin(ID); + will(returnValue(plugin)); + oneOf(plugin).getState(); + will(returnValue(ACTIVE)); + oneOf(taskScheduler).schedule(with(any(Runnable.class)), + with(ioExecutor), with(REACHABILITY_PERIOD_MS), + with(MILLISECONDS)); + will(returnValue(scheduledTask)); + }}); + + monitor.start(); + + // If Tor has only just become active and the TransportActiveEvent + // arrives after the task has already been scheduled, a second task + // should not be scheduled + monitor.eventOccurred(new TransportActiveEvent(ID)); + + // Destroying the monitor should cancel the task + context.checking(new Expectations() {{ + oneOf(eventBus).removeListener(monitor); + oneOf(scheduledTask).cancel(); + }}); + + monitor.destroy(); + } + + @Test + public void testSchedulesTaskWhenTorBecomesActive() { + // Starting the monitor should not schedule a task as Tor is inactive + context.checking(new Expectations() {{ + oneOf(eventBus).addListener(monitor); + oneOf(pluginManager).getPlugin(ID); + will(returnValue(plugin)); + oneOf(plugin).getState(); + will(returnValue(ENABLING)); + }}); + + monitor.start(); + + // When Tor becomes active, a task should be scheduled + context.checking(new Expectations() {{ + oneOf(taskScheduler).schedule(with(any(Runnable.class)), + with(ioExecutor), with(REACHABILITY_PERIOD_MS), + with(MILLISECONDS)); + will(returnValue(scheduledTask)); + }}); + + monitor.eventOccurred(new TransportActiveEvent(ID)); + + // Destroying the monitor should cancel the task + context.checking(new Expectations() {{ + oneOf(eventBus).removeListener(monitor); + oneOf(scheduledTask).cancel(); + }}); + + monitor.destroy(); + } + + @Test + public void testCancelsTaskWhenTorBecomesInactive() { + // Starting the monitor should schedule a task + context.checking(new Expectations() {{ + oneOf(eventBus).addListener(monitor); + oneOf(pluginManager).getPlugin(ID); + will(returnValue(plugin)); + oneOf(plugin).getState(); + will(returnValue(ACTIVE)); + oneOf(taskScheduler).schedule(with(any(Runnable.class)), + with(ioExecutor), with(REACHABILITY_PERIOD_MS), + with(MILLISECONDS)); + will(returnValue(scheduledTask)); + }}); + + monitor.start(); + + // When Tor becomes inactive, the task should be cancelled + context.checking(new Expectations() {{ + oneOf(scheduledTask).cancel(); + }}); + + monitor.eventOccurred(new TransportInactiveEvent(ID)); + + // Destroying the monitor should not affect the task, which has + // already been cancelled + context.checking(new Expectations() {{ + oneOf(eventBus).removeListener(monitor); + }}); + + monitor.destroy(); + } + + @Test + public void testObserverRegisteredBeforeTorBecomesActiveIsCalled() { + // Starting the monitor should not schedule a task as Tor is inactive + context.checking(new Expectations() {{ + oneOf(eventBus).addListener(monitor); + oneOf(pluginManager).getPlugin(ID); + will(returnValue(plugin)); + oneOf(plugin).getState(); + will(returnValue(DISABLED)); + }}); + + monitor.start(); + + // Register an observer + monitor.addOneShotObserver(observer); + + // When Tor becomes active, a task should be scheduled + AtomicReference runnable = new AtomicReference<>(null); + context.checking(new Expectations() {{ + oneOf(taskScheduler).schedule(with(any(Runnable.class)), + with(ioExecutor), with(REACHABILITY_PERIOD_MS), + with(MILLISECONDS)); + will(new DoAllAction( + new CaptureArgumentAction<>(runnable, Runnable.class, 0), + returnValue(scheduledTask) + )); + }}); + + monitor.eventOccurred(new TransportActiveEvent(ID)); + + // When the task runs, the observer should be called + context.checking(new Expectations() {{ + oneOf(observer).onTorReachable(); + }}); + + runnable.get().run(); + } + + @Test + public void testObserverRegisteredBeforeTorBecomesReachableIsCalled() { + // Starting the monitor should schedule a task + AtomicReference runnable = new AtomicReference<>(null); + context.checking(new Expectations() {{ + oneOf(eventBus).addListener(monitor); + oneOf(pluginManager).getPlugin(ID); + will(returnValue(plugin)); + oneOf(plugin).getState(); + will(returnValue(ACTIVE)); + oneOf(taskScheduler).schedule(with(any(Runnable.class)), + with(ioExecutor), with(REACHABILITY_PERIOD_MS), + with(MILLISECONDS)); + will(new DoAllAction( + new CaptureArgumentAction<>(runnable, Runnable.class, 0), + returnValue(scheduledTask) + )); + }}); + + monitor.start(); + + // Register an observer + monitor.addOneShotObserver(observer); + + // When the task runs, the observer should be called + context.checking(new Expectations() {{ + oneOf(observer).onTorReachable(); + }}); + + runnable.get().run(); + } + + @Test + public void testObserverRegisteredAfterTorBecomesReachableIsCalled() { + // Starting the monitor should schedule a task + AtomicReference runnable = new AtomicReference<>(null); + context.checking(new Expectations() {{ + oneOf(eventBus).addListener(monitor); + oneOf(pluginManager).getPlugin(ID); + will(returnValue(plugin)); + oneOf(plugin).getState(); + will(returnValue(ACTIVE)); + oneOf(taskScheduler).schedule(with(any(Runnable.class)), + with(ioExecutor), with(REACHABILITY_PERIOD_MS), + with(MILLISECONDS)); + will(new DoAllAction( + new CaptureArgumentAction<>(runnable, Runnable.class, 0), + returnValue(scheduledTask) + )); + }}); + + monitor.start(); + + // When the task runs, no observers have been registered yet + runnable.get().run(); + + // When an observer is registered, it should be called immediately + context.checking(new Expectations() {{ + oneOf(observer).onTorReachable(); + }}); + + monitor.addOneShotObserver(observer); + } +}