Merge branch '2292-contact-mailbox-download-worker' into 'master'

Mailbox download worker for a contact's mailbox

Closes #2292

See merge request briar/briar!1658
This commit is contained in:
Torsten Grote
2022-06-08 16:31:35 +00:00
11 changed files with 570 additions and 18 deletions

View File

@@ -22,10 +22,21 @@ interface ConnectivityChecker {
* the check succeeds. If a check is already running then the observer is
* called when the check succeeds. If a connectivity check has recently
* succeeded then the observer is called immediately.
* <p>
* Observers are removed after being called, or when the checker is
* {@link #destroy() destroyed}.
*/
void checkConnectivity(MailboxProperties properties,
ConnectivityObserver o);
/**
* Removes an observer that was added via
* {@link #checkConnectivity(MailboxProperties, ConnectivityObserver)}. If
* there are no remaining observers and a connectivity check is running
* then the check will be cancelled.
*/
void removeObserver(ConnectivityObserver o);
interface ConnectivityObserver {
void onConnectivityCheckSucceeded();
}

View File

@@ -80,8 +80,7 @@ abstract class ConnectivityCheckerImpl implements ConnectivityChecker {
> CONNECTIVITY_CHECK_FRESHNESS_MS) {
// The last connectivity check is stale, start a new one
connectivityObservers.add(o);
ApiCall task =
createConnectivityCheckTask(properties);
ApiCall task = createConnectivityCheckTask(properties);
connectivityCheck = mailboxApiCaller.retryWithBackoff(task);
} else {
// The last connectivity check is fresh
@@ -108,4 +107,16 @@ abstract class ConnectivityCheckerImpl implements ConnectivityChecker {
o.onConnectivityCheckSucceeded();
}
}
@Override
public void removeObserver(ConnectivityObserver o) {
synchronized (lock) {
if (destroyed) return;
connectivityObservers.remove(o);
if (connectivityObservers.isEmpty() && connectivityCheck != null) {
connectivityCheck.cancel();
connectivityCheck = null;
}
}
}
}

View File

@@ -5,8 +5,6 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
import java.io.IOException;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
@@ -24,16 +22,11 @@ class ContactMailboxConnectivityChecker extends ConnectivityCheckerImpl {
@Override
ApiCall createConnectivityCheckTask(MailboxProperties properties) {
if (properties.isOwner()) throw new IllegalArgumentException();
return new SimpleApiCall() {
@Override
void tryToCallApi() throws IOException, ApiException {
if (!mailboxApi.checkStatus(properties)) {
throw new ApiException();
}
// Call the observers and cache the result
onConnectivityCheckSucceeded(clock.currentTimeMillis());
}
};
return new SimpleApiCall(() -> {
if (!mailboxApi.checkStatus(properties)) throw new ApiException();
// Call the observers and cache the result
onConnectivityCheckSucceeded(clock.currentTimeMillis());
});
}
}

View File

@@ -0,0 +1,241 @@
package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.Cancellable;
import org.briarproject.bramble.api.mailbox.MailboxProperties;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver;
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import static java.util.logging.Level.INFO;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
import static org.briarproject.bramble.util.LogUtils.logException;
@ThreadSafe
@NotNullByDefault
class ContactMailboxDownloadWorker implements MailboxWorker,
ConnectivityObserver, TorReachabilityObserver {
/**
* When the worker is started it waits for a connectivity check, then
* starts its first download cycle: checking the inbox, downloading and
* deleting any files, and checking again until the inbox is empty.
* <p>
* The worker then waits for our Tor hidden service to be reachable before
* starting its second download cycle. This ensures that if a contact
* tried and failed to connect to our hidden service before it was
* reachable, and therefore uploaded a file to the mailbox instead, we'll
* find the file in the second download cycle.
*/
private enum State {
CREATED,
CONNECTIVITY_CHECK,
DOWNLOAD_CYCLE_1,
WAITING_FOR_TOR,
DOWNLOAD_CYCLE_2,
FINISHED,
DESTROYED
}
private static final Logger LOG =
getLogger(ContactMailboxDownloadWorker.class.getName());
private final ConnectivityChecker connectivityChecker;
private final TorReachabilityMonitor torReachabilityMonitor;
private final MailboxApiCaller mailboxApiCaller;
private final MailboxApi mailboxApi;
private final MailboxFileManager mailboxFileManager;
private final MailboxProperties mailboxProperties;
private final Object lock = new Object();
@GuardedBy("lock")
private State state = State.CREATED;
@GuardedBy("lock")
@Nullable
private Cancellable apiCall = null;
ContactMailboxDownloadWorker(
ConnectivityChecker connectivityChecker,
TorReachabilityMonitor torReachabilityMonitor,
MailboxApiCaller mailboxApiCaller,
MailboxApi mailboxApi,
MailboxFileManager mailboxFileManager,
MailboxProperties mailboxProperties) {
if (mailboxProperties.isOwner()) throw new IllegalArgumentException();
this.connectivityChecker = connectivityChecker;
this.torReachabilityMonitor = torReachabilityMonitor;
this.mailboxApiCaller = mailboxApiCaller;
this.mailboxApi = mailboxApi;
this.mailboxFileManager = mailboxFileManager;
this.mailboxProperties = mailboxProperties;
}
@Override
public void start() {
LOG.info("Started");
synchronized (lock) {
// Don't allow the worker to be reused
if (state != State.CREATED) throw new IllegalStateException();
state = State.CONNECTIVITY_CHECK;
}
// Avoid leaking observer in case destroy() is called concurrently
// before observer is added
connectivityChecker.checkConnectivity(mailboxProperties, this);
boolean destroyed;
synchronized (lock) {
destroyed = state == State.DESTROYED;
}
if (destroyed) connectivityChecker.removeObserver(this);
}
@Override
public void destroy() {
LOG.info("Destroyed");
Cancellable apiCall;
synchronized (lock) {
state = State.DESTROYED;
apiCall = this.apiCall;
this.apiCall = null;
}
if (apiCall != null) apiCall.cancel();
connectivityChecker.removeObserver(this);
torReachabilityMonitor.removeObserver(this);
}
@Override
public void onConnectivityCheckSucceeded() {
LOG.info("Connectivity check succeeded");
synchronized (lock) {
if (state != State.CONNECTIVITY_CHECK) return;
state = State.DOWNLOAD_CYCLE_1;
// Start first download cycle
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(this::apiCallListInbox));
}
}
private void apiCallListInbox() throws IOException, ApiException {
synchronized (lock) {
if (state == State.DESTROYED) return;
}
LOG.info("Listing inbox");
List<MailboxFile> files = mailboxApi.getFiles(mailboxProperties,
requireNonNull(mailboxProperties.getInboxId()));
if (files.isEmpty()) onDownloadCycleFinished();
else downloadNextFile(new LinkedList<>(files));
}
private void onDownloadCycleFinished() {
boolean addObserver = false;
synchronized (lock) {
if (state == State.DOWNLOAD_CYCLE_1) {
LOG.info("First download cycle finished");
state = State.WAITING_FOR_TOR;
addObserver = true;
} else if (state == State.DOWNLOAD_CYCLE_2) {
LOG.info("Second download cycle finished");
state = State.FINISHED;
}
}
if (addObserver) {
// Avoid leaking observer in case destroy() is called concurrently
// before observer is added
torReachabilityMonitor.addOneShotObserver(this);
boolean destroyed;
synchronized (lock) {
destroyed = state == State.DESTROYED;
}
if (destroyed) torReachabilityMonitor.removeObserver(this);
}
}
private void downloadNextFile(Queue<MailboxFile> queue) {
synchronized (lock) {
if (state == State.DESTROYED) return;
MailboxFile file = queue.remove();
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(() -> apiCallDownloadFile(file, queue)));
}
}
private void apiCallDownloadFile(MailboxFile file,
Queue<MailboxFile> queue) throws IOException, ApiException {
synchronized (lock) {
if (state == State.DESTROYED) return;
}
LOG.info("Downloading file");
File tempFile = mailboxFileManager.createTempFileForDownload();
try {
mailboxApi.getFile(mailboxProperties,
requireNonNull(mailboxProperties.getInboxId()),
file.name, tempFile);
} catch (IOException | ApiException e) {
if (!tempFile.delete()) {
LOG.warning("Failed to delete temporary file");
}
throw e;
}
mailboxFileManager.handleDownloadedFile(tempFile);
deleteFile(file, queue);
}
private void deleteFile(MailboxFile file, Queue<MailboxFile> queue) {
synchronized (lock) {
if (state == State.DESTROYED) return;
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(() -> apiCallDeleteFile(file, queue)));
}
}
private void apiCallDeleteFile(MailboxFile file, Queue<MailboxFile> queue)
throws IOException, ApiException {
synchronized (lock) {
if (state == State.DESTROYED) return;
}
try {
mailboxApi.deleteFile(mailboxProperties,
requireNonNull(mailboxProperties.getInboxId()), file.name);
} catch (TolerableFailureException e) {
// Catch this so we can continue to the next file
logException(LOG, INFO, e);
}
if (queue.isEmpty()) {
// List the inbox again to check for files that may have arrived
// while we were downloading
synchronized (lock) {
if (state == State.DESTROYED) return;
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(this::apiCallListInbox));
}
} else {
downloadNextFile(queue);
}
}
@Override
public void onTorReachable() {
LOG.info("Our Tor hidden service is reachable");
synchronized (lock) {
if (state != State.WAITING_FOR_TOR) return;
state = State.DOWNLOAD_CYCLE_2;
// Start second download cycle
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(this::apiCallListInbox));
}
}
}

View File

@@ -24,6 +24,8 @@ interface MailboxApiCaller {
* Asynchronously calls the given API call on the {@link IoExecutor},
* automatically retrying at increasing intervals until the API call
* returns false or retries are cancelled.
* <p>
* This method is safe to call while holding a lock.
*
* @return A {@link Cancellable} that can be used to cancel any future
* retries.

View File

@@ -0,0 +1,23 @@
package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import javax.annotation.concurrent.ThreadSafe;
/**
* A worker that downloads files from a contact's mailbox.
*/
@ThreadSafe
@NotNullByDefault
interface MailboxWorker {
/**
* Asynchronously starts the worker.
*/
void start();
/**
* Destroys the worker and cancels any pending tasks or retries.
*/
void destroy();
}

View File

@@ -16,17 +16,20 @@ import static org.briarproject.bramble.util.LogUtils.logException;
* Convenience class for making simple API calls that don't return values.
*/
@NotNullByDefault
public abstract class SimpleApiCall implements ApiCall {
class SimpleApiCall implements ApiCall {
private static final Logger LOG = getLogger(SimpleApiCall.class.getName());
abstract void tryToCallApi()
throws IOException, ApiException, TolerableFailureException;
private final Attempt attempt;
SimpleApiCall(Attempt attempt) {
this.attempt = attempt;
}
@Override
public boolean callApi() {
try {
tryToCallApi();
attempt.tryToCallApi();
return false; // Succeeded, don't retry
} catch (IOException | ApiException e) {
logException(LOG, WARNING, e);
@@ -36,4 +39,17 @@ public abstract class SimpleApiCall implements ApiCall {
return false; // Failed tolerably, don't retry
}
}
interface Attempt {
/**
* Makes a single attempt to call an API endpoint. If this method
* throws an {@link IOException} or an {@link ApiException}, the call
* will be retried. If it throws a {@link TolerableFailureException}
* or returns without throwing an exception, the call will not be
* retried.
*/
void tryToCallApi()
throws IOException, ApiException, TolerableFailureException;
}
}

View File

@@ -38,6 +38,12 @@ interface TorReachabilityMonitor {
*/
void addOneShotObserver(TorReachabilityObserver o);
/**
* Removes an observer that was added via
* {@link #addOneShotObserver(TorReachabilityObserver)}.
*/
void removeObserver(TorReachabilityObserver o);
interface TorReachabilityObserver {
void onTorReachable();

View File

@@ -87,6 +87,14 @@ class TorReachabilityMonitorImpl
if (callNow) o.onTorReachable();
}
@Override
public void removeObserver(TorReachabilityObserver o) {
synchronized (lock) {
if (destroyed) return;
observers.remove(o);
}
}
@Override
public void eventOccurred(Event e) {
if (e instanceof TransportActiveEvent) {