diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ConnectivityChecker.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ConnectivityChecker.java index 921557027..d0a6ab57d 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ConnectivityChecker.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ConnectivityChecker.java @@ -22,10 +22,21 @@ interface ConnectivityChecker { * the check succeeds. If a check is already running then the observer is * called when the check succeeds. If a connectivity check has recently * succeeded then the observer is called immediately. + *
+ * Observers are removed after being called, or when the checker is + * {@link #destroy() destroyed}. */ void checkConnectivity(MailboxProperties properties, ConnectivityObserver o); + /** + * Removes an observer that was added via + * {@link #checkConnectivity(MailboxProperties, ConnectivityObserver)}. If + * there are no remaining observers and a connectivity check is running + * then the check will be cancelled. + */ + void removeObserver(ConnectivityObserver o); + interface ConnectivityObserver { void onConnectivityCheckSucceeded(); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ConnectivityCheckerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ConnectivityCheckerImpl.java index 1468c5db1..aa3e97930 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ConnectivityCheckerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ConnectivityCheckerImpl.java @@ -80,8 +80,7 @@ abstract class ConnectivityCheckerImpl implements ConnectivityChecker { > CONNECTIVITY_CHECK_FRESHNESS_MS) { // The last connectivity check is stale, start a new one connectivityObservers.add(o); - ApiCall task = - createConnectivityCheckTask(properties); + ApiCall task = createConnectivityCheckTask(properties); connectivityCheck = mailboxApiCaller.retryWithBackoff(task); } else { // The last connectivity check is fresh @@ -108,4 +107,16 @@ abstract class ConnectivityCheckerImpl implements ConnectivityChecker { o.onConnectivityCheckSucceeded(); } } + + @Override + public void removeObserver(ConnectivityObserver o) { + synchronized (lock) { + if (destroyed) return; + connectivityObservers.remove(o); + if (connectivityObservers.isEmpty() && connectivityCheck != null) { + connectivityCheck.cancel(); + connectivityCheck = null; + } + } + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxConnectivityChecker.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxConnectivityChecker.java index 8759aa595..8922948c6 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxConnectivityChecker.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxConnectivityChecker.java @@ -5,8 +5,6 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.mailbox.MailboxApi.ApiException; -import java.io.IOException; - import javax.annotation.concurrent.ThreadSafe; @ThreadSafe @@ -24,16 +22,11 @@ class ContactMailboxConnectivityChecker extends ConnectivityCheckerImpl { @Override ApiCall createConnectivityCheckTask(MailboxProperties properties) { if (properties.isOwner()) throw new IllegalArgumentException(); - return new SimpleApiCall() { - @Override - void tryToCallApi() throws IOException, ApiException { - if (!mailboxApi.checkStatus(properties)) { - throw new ApiException(); - } - // Call the observers and cache the result - onConnectivityCheckSucceeded(clock.currentTimeMillis()); - } - }; + return new SimpleApiCall(() -> { + if (!mailboxApi.checkStatus(properties)) throw new ApiException(); + // Call the observers and cache the result + onConnectivityCheckSucceeded(clock.currentTimeMillis()); + }); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorker.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorker.java new file mode 100644 index 000000000..70e4266f6 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorker.java @@ -0,0 +1,241 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.Cancellable; +import org.briarproject.bramble.api.mailbox.MailboxProperties; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver; +import org.briarproject.bramble.mailbox.MailboxApi.ApiException; +import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile; +import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException; +import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.logging.Logger; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import static java.util.logging.Level.INFO; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; +import static org.briarproject.bramble.util.LogUtils.logException; + +@ThreadSafe +@NotNullByDefault +class ContactMailboxDownloadWorker implements MailboxWorker, + ConnectivityObserver, TorReachabilityObserver { + + /** + * When the worker is started it waits for a connectivity check, then + * starts its first download cycle: checking the inbox, downloading and + * deleting any files, and checking again until the inbox is empty. + *
+ * The worker then waits for our Tor hidden service to be reachable before
+ * starting its second download cycle. This ensures that if a contact
+ * tried and failed to connect to our hidden service before it was
+ * reachable, and therefore uploaded a file to the mailbox instead, we'll
+ * find the file in the second download cycle.
+ */
+ private enum State {
+ CREATED,
+ CONNECTIVITY_CHECK,
+ DOWNLOAD_CYCLE_1,
+ WAITING_FOR_TOR,
+ DOWNLOAD_CYCLE_2,
+ FINISHED,
+ DESTROYED
+ }
+
+ private static final Logger LOG =
+ getLogger(ContactMailboxDownloadWorker.class.getName());
+
+ private final ConnectivityChecker connectivityChecker;
+ private final TorReachabilityMonitor torReachabilityMonitor;
+ private final MailboxApiCaller mailboxApiCaller;
+ private final MailboxApi mailboxApi;
+ private final MailboxFileManager mailboxFileManager;
+ private final MailboxProperties mailboxProperties;
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private State state = State.CREATED;
+
+ @GuardedBy("lock")
+ @Nullable
+ private Cancellable apiCall = null;
+
+ ContactMailboxDownloadWorker(
+ ConnectivityChecker connectivityChecker,
+ TorReachabilityMonitor torReachabilityMonitor,
+ MailboxApiCaller mailboxApiCaller,
+ MailboxApi mailboxApi,
+ MailboxFileManager mailboxFileManager,
+ MailboxProperties mailboxProperties) {
+ if (mailboxProperties.isOwner()) throw new IllegalArgumentException();
+ this.connectivityChecker = connectivityChecker;
+ this.torReachabilityMonitor = torReachabilityMonitor;
+ this.mailboxApiCaller = mailboxApiCaller;
+ this.mailboxApi = mailboxApi;
+ this.mailboxFileManager = mailboxFileManager;
+ this.mailboxProperties = mailboxProperties;
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Started");
+ synchronized (lock) {
+ // Don't allow the worker to be reused
+ if (state != State.CREATED) throw new IllegalStateException();
+ state = State.CONNECTIVITY_CHECK;
+ }
+ // Avoid leaking observer in case destroy() is called concurrently
+ // before observer is added
+ connectivityChecker.checkConnectivity(mailboxProperties, this);
+ boolean destroyed;
+ synchronized (lock) {
+ destroyed = state == State.DESTROYED;
+ }
+ if (destroyed) connectivityChecker.removeObserver(this);
+ }
+
+ @Override
+ public void destroy() {
+ LOG.info("Destroyed");
+ Cancellable apiCall;
+ synchronized (lock) {
+ state = State.DESTROYED;
+ apiCall = this.apiCall;
+ this.apiCall = null;
+ }
+ if (apiCall != null) apiCall.cancel();
+ connectivityChecker.removeObserver(this);
+ torReachabilityMonitor.removeObserver(this);
+ }
+
+ @Override
+ public void onConnectivityCheckSucceeded() {
+ LOG.info("Connectivity check succeeded");
+ synchronized (lock) {
+ if (state != State.CONNECTIVITY_CHECK) return;
+ state = State.DOWNLOAD_CYCLE_1;
+ // Start first download cycle
+ apiCall = mailboxApiCaller.retryWithBackoff(
+ new SimpleApiCall(this::apiCallListInbox));
+ }
+ }
+
+ private void apiCallListInbox() throws IOException, ApiException {
+ synchronized (lock) {
+ if (state == State.DESTROYED) return;
+ }
+ LOG.info("Listing inbox");
+ List
+ * This method is safe to call while holding a lock.
*
* @return A {@link Cancellable} that can be used to cancel any future
* retries.
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxWorker.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxWorker.java
new file mode 100644
index 000000000..75b0f7ce7
--- /dev/null
+++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxWorker.java
@@ -0,0 +1,23 @@
+package org.briarproject.bramble.mailbox;
+
+import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A worker that downloads files from a contact's mailbox.
+ */
+@ThreadSafe
+@NotNullByDefault
+interface MailboxWorker {
+
+ /**
+ * Asynchronously starts the worker.
+ */
+ void start();
+
+ /**
+ * Destroys the worker and cancels any pending tasks or retries.
+ */
+ void destroy();
+}
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/SimpleApiCall.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/SimpleApiCall.java
index 7c3e6f4b0..9a34a6244 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/SimpleApiCall.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/SimpleApiCall.java
@@ -16,17 +16,20 @@ import static org.briarproject.bramble.util.LogUtils.logException;
* Convenience class for making simple API calls that don't return values.
*/
@NotNullByDefault
-public abstract class SimpleApiCall implements ApiCall {
+class SimpleApiCall implements ApiCall {
private static final Logger LOG = getLogger(SimpleApiCall.class.getName());
- abstract void tryToCallApi()
- throws IOException, ApiException, TolerableFailureException;
+ private final Attempt attempt;
+
+ SimpleApiCall(Attempt attempt) {
+ this.attempt = attempt;
+ }
@Override
public boolean callApi() {
try {
- tryToCallApi();
+ attempt.tryToCallApi();
return false; // Succeeded, don't retry
} catch (IOException | ApiException e) {
logException(LOG, WARNING, e);
@@ -36,4 +39,17 @@ public abstract class SimpleApiCall implements ApiCall {
return false; // Failed tolerably, don't retry
}
}
+
+ interface Attempt {
+
+ /**
+ * Makes a single attempt to call an API endpoint. If this method
+ * throws an {@link IOException} or an {@link ApiException}, the call
+ * will be retried. If it throws a {@link TolerableFailureException}
+ * or returns without throwing an exception, the call will not be
+ * retried.
+ */
+ void tryToCallApi()
+ throws IOException, ApiException, TolerableFailureException;
+ }
}
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitor.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitor.java
index c6b86d88a..5dd10b365 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitor.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitor.java
@@ -38,6 +38,12 @@ interface TorReachabilityMonitor {
*/
void addOneShotObserver(TorReachabilityObserver o);
+ /**
+ * Removes an observer that was added via
+ * {@link #addOneShotObserver(TorReachabilityObserver)}.
+ */
+ void removeObserver(TorReachabilityObserver o);
+
interface TorReachabilityObserver {
void onTorReachable();
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImpl.java
index fa16682b3..8620aee01 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/TorReachabilityMonitorImpl.java
@@ -87,6 +87,14 @@ class TorReachabilityMonitorImpl
if (callNow) o.onTorReachable();
}
+ @Override
+ public void removeObserver(TorReachabilityObserver o) {
+ synchronized (lock) {
+ if (destroyed) return;
+ observers.remove(o);
+ }
+ }
+
@Override
public void eventOccurred(Event e) {
if (e instanceof TransportActiveEvent) {
diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ConnectivityCheckerImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ConnectivityCheckerImplTest.java
index 09fb0df5b..a476ec552 100644
--- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ConnectivityCheckerImplTest.java
+++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ConnectivityCheckerImplTest.java
@@ -187,6 +187,32 @@ public class ConnectivityCheckerImplTest extends BrambleMockTestCase {
checker.onConnectivityCheckSucceeded(now);
}
+ @Test
+ public void testCheckIsCancelledWhenObserverIsRemoved() {
+ ConnectivityCheckerImpl checker = createChecker();
+
+ // When checkConnectivity() is called a check should be started
+ context.checking(new Expectations() {{
+ oneOf(clock).currentTimeMillis();
+ will(returnValue(now));
+ oneOf(mailboxApiCaller).retryWithBackoff(apiCall);
+ will(returnValue(task));
+ }});
+
+ checker.checkConnectivity(properties, observer1);
+
+ // When the observer is removed the check should be cancelled
+ context.checking(new Expectations() {{
+ oneOf(task).cancel();
+ }});
+
+ checker.removeObserver(observer1);
+
+ // If the check runs anyway (cancellation came too late) the observer
+ // should not be called
+ checker.onConnectivityCheckSucceeded(now);
+ }
+
private ConnectivityCheckerImpl createChecker() {
return new ConnectivityCheckerImpl(clock, mailboxApiCaller) {
diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorkerTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorkerTest.java
new file mode 100644
index 000000000..273c4f016
--- /dev/null
+++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorkerTest.java
@@ -0,0 +1,215 @@
+package org.briarproject.bramble.mailbox;
+
+import org.briarproject.bramble.api.mailbox.MailboxFileId;
+import org.briarproject.bramble.api.mailbox.MailboxProperties;
+import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
+import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
+import org.briarproject.bramble.test.BrambleMockTestCase;
+import org.briarproject.bramble.test.CaptureArgumentAction;
+import org.jmock.Expectations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
+import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
+import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
+import static org.briarproject.bramble.test.TestUtils.getMailboxProperties;
+import static org.briarproject.bramble.test.TestUtils.getRandomId;
+import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
+import static org.junit.Assert.assertFalse;
+
+public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
+
+ private final ConnectivityChecker connectivityChecker =
+ context.mock(ConnectivityChecker.class);
+ private final TorReachabilityMonitor torReachabilityMonitor =
+ context.mock(TorReachabilityMonitor.class);
+ private final MailboxApiCaller mailboxApiCaller =
+ context.mock(MailboxApiCaller.class);
+ private final MailboxApi mailboxApi = context.mock(MailboxApi.class);
+ private final MailboxFileManager mailboxFileManager =
+ context.mock(MailboxFileManager.class);
+
+ private final MailboxProperties mailboxProperties =
+ getMailboxProperties(false, CLIENT_SUPPORTS);
+ private final long now = System.currentTimeMillis();
+ private final MailboxFile file1 =
+ new MailboxFile(new MailboxFileId(getRandomId()), now - 1);
+ private final MailboxFile file2 =
+ new MailboxFile(new MailboxFileId(getRandomId()), now);
+ private final List