mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-16 04:39:54 +01:00
Merge branch '2293-own-mailbox-download-worker' into 'master'
Mailbox download worker for our own mailbox Closes #2293 See merge request briar/briar!1689
This commit is contained in:
@@ -1,73 +1,23 @@
|
|||||||
package org.briarproject.bramble.mailbox;
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
import org.briarproject.bramble.api.Cancellable;
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver;
|
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
|
||||||
import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.logging.Logger;
|
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import javax.annotation.concurrent.GuardedBy;
|
|
||||||
import javax.annotation.concurrent.ThreadSafe;
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
import static java.util.logging.Level.INFO;
|
|
||||||
import static java.util.logging.Logger.getLogger;
|
|
||||||
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
||||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
|
||||||
|
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
@NotNullByDefault
|
@NotNullByDefault
|
||||||
class ContactMailboxDownloadWorker implements MailboxWorker,
|
class ContactMailboxDownloadWorker extends MailboxDownloadWorker {
|
||||||
ConnectivityObserver, TorReachabilityObserver {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* When the worker is started it waits for a connectivity check, then
|
|
||||||
* starts its first download cycle: checking the inbox, downloading and
|
|
||||||
* deleting any files, and checking again until the inbox is empty.
|
|
||||||
* <p>
|
|
||||||
* The worker then waits for our Tor hidden service to be reachable before
|
|
||||||
* starting its second download cycle. This ensures that if a contact
|
|
||||||
* tried and failed to connect to our hidden service before it was
|
|
||||||
* reachable, and therefore uploaded a file to the mailbox instead, we'll
|
|
||||||
* find the file in the second download cycle.
|
|
||||||
*/
|
|
||||||
private enum State {
|
|
||||||
CREATED,
|
|
||||||
CONNECTIVITY_CHECK,
|
|
||||||
DOWNLOAD_CYCLE_1,
|
|
||||||
WAITING_FOR_TOR,
|
|
||||||
DOWNLOAD_CYCLE_2,
|
|
||||||
FINISHED,
|
|
||||||
DESTROYED
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
|
||||||
getLogger(ContactMailboxDownloadWorker.class.getName());
|
|
||||||
|
|
||||||
private final ConnectivityChecker connectivityChecker;
|
|
||||||
private final TorReachabilityMonitor torReachabilityMonitor;
|
|
||||||
private final MailboxApiCaller mailboxApiCaller;
|
|
||||||
private final MailboxApi mailboxApi;
|
|
||||||
private final MailboxFileManager mailboxFileManager;
|
|
||||||
private final MailboxProperties mailboxProperties;
|
|
||||||
private final Object lock = new Object();
|
|
||||||
|
|
||||||
@GuardedBy("lock")
|
|
||||||
private State state = State.CREATED;
|
|
||||||
|
|
||||||
@GuardedBy("lock")
|
|
||||||
@Nullable
|
|
||||||
private Cancellable apiCall = null;
|
|
||||||
|
|
||||||
ContactMailboxDownloadWorker(
|
ContactMailboxDownloadWorker(
|
||||||
ConnectivityChecker connectivityChecker,
|
ConnectivityChecker connectivityChecker,
|
||||||
@@ -76,57 +26,14 @@ class ContactMailboxDownloadWorker implements MailboxWorker,
|
|||||||
MailboxApi mailboxApi,
|
MailboxApi mailboxApi,
|
||||||
MailboxFileManager mailboxFileManager,
|
MailboxFileManager mailboxFileManager,
|
||||||
MailboxProperties mailboxProperties) {
|
MailboxProperties mailboxProperties) {
|
||||||
|
super(connectivityChecker, torReachabilityMonitor, mailboxApiCaller,
|
||||||
|
mailboxApi, mailboxFileManager, mailboxProperties);
|
||||||
if (mailboxProperties.isOwner()) throw new IllegalArgumentException();
|
if (mailboxProperties.isOwner()) throw new IllegalArgumentException();
|
||||||
this.connectivityChecker = connectivityChecker;
|
|
||||||
this.torReachabilityMonitor = torReachabilityMonitor;
|
|
||||||
this.mailboxApiCaller = mailboxApiCaller;
|
|
||||||
this.mailboxApi = mailboxApi;
|
|
||||||
this.mailboxFileManager = mailboxFileManager;
|
|
||||||
this.mailboxProperties = mailboxProperties;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
protected ApiCall createApiCallForDownloadCycle() {
|
||||||
LOG.info("Started");
|
return new SimpleApiCall(this::apiCallListInbox);
|
||||||
synchronized (lock) {
|
|
||||||
// Don't allow the worker to be reused
|
|
||||||
if (state != State.CREATED) return;
|
|
||||||
state = State.CONNECTIVITY_CHECK;
|
|
||||||
}
|
|
||||||
// Avoid leaking observer in case destroy() is called concurrently
|
|
||||||
// before observer is added
|
|
||||||
connectivityChecker.checkConnectivity(mailboxProperties, this);
|
|
||||||
boolean destroyed;
|
|
||||||
synchronized (lock) {
|
|
||||||
destroyed = state == State.DESTROYED;
|
|
||||||
}
|
|
||||||
if (destroyed) connectivityChecker.removeObserver(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void destroy() {
|
|
||||||
LOG.info("Destroyed");
|
|
||||||
Cancellable apiCall;
|
|
||||||
synchronized (lock) {
|
|
||||||
state = State.DESTROYED;
|
|
||||||
apiCall = this.apiCall;
|
|
||||||
this.apiCall = null;
|
|
||||||
}
|
|
||||||
if (apiCall != null) apiCall.cancel();
|
|
||||||
connectivityChecker.removeObserver(this);
|
|
||||||
torReachabilityMonitor.removeObserver(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onConnectivityCheckSucceeded() {
|
|
||||||
LOG.info("Connectivity check succeeded");
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state != State.CONNECTIVITY_CHECK) return;
|
|
||||||
state = State.DOWNLOAD_CYCLE_1;
|
|
||||||
// Start first download cycle
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(this::apiCallListInbox));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void apiCallListInbox() throws IOException, ApiException {
|
private void apiCallListInbox() throws IOException, ApiException {
|
||||||
@@ -134,110 +41,18 @@ class ContactMailboxDownloadWorker implements MailboxWorker,
|
|||||||
if (state == State.DESTROYED) return;
|
if (state == State.DESTROYED) return;
|
||||||
}
|
}
|
||||||
LOG.info("Listing inbox");
|
LOG.info("Listing inbox");
|
||||||
List<MailboxFile> files = mailboxApi.getFiles(mailboxProperties,
|
MailboxFolderId folderId =
|
||||||
requireNonNull(mailboxProperties.getInboxId()));
|
requireNonNull(mailboxProperties.getInboxId());
|
||||||
if (files.isEmpty()) onDownloadCycleFinished();
|
List<MailboxFile> files =
|
||||||
else downloadNextFile(new LinkedList<>(files));
|
mailboxApi.getFiles(mailboxProperties, folderId);
|
||||||
}
|
if (files.isEmpty()) {
|
||||||
|
onDownloadCycleFinished();
|
||||||
private void onDownloadCycleFinished() {
|
|
||||||
boolean addObserver = false;
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DOWNLOAD_CYCLE_1) {
|
|
||||||
LOG.info("First download cycle finished");
|
|
||||||
state = State.WAITING_FOR_TOR;
|
|
||||||
apiCall = null;
|
|
||||||
addObserver = true;
|
|
||||||
} else if (state == State.DOWNLOAD_CYCLE_2) {
|
|
||||||
LOG.info("Second download cycle finished");
|
|
||||||
state = State.FINISHED;
|
|
||||||
apiCall = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (addObserver) {
|
|
||||||
// Avoid leaking observer in case destroy() is called concurrently
|
|
||||||
// before observer is added
|
|
||||||
torReachabilityMonitor.addOneShotObserver(this);
|
|
||||||
boolean destroyed;
|
|
||||||
synchronized (lock) {
|
|
||||||
destroyed = state == State.DESTROYED;
|
|
||||||
}
|
|
||||||
if (destroyed) torReachabilityMonitor.removeObserver(this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void downloadNextFile(Queue<MailboxFile> queue) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
MailboxFile file = queue.remove();
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(() -> apiCallDownloadFile(file, queue)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void apiCallDownloadFile(MailboxFile file,
|
|
||||||
Queue<MailboxFile> queue) throws IOException, ApiException {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
}
|
|
||||||
LOG.info("Downloading file");
|
|
||||||
File tempFile = mailboxFileManager.createTempFileForDownload();
|
|
||||||
try {
|
|
||||||
mailboxApi.getFile(mailboxProperties,
|
|
||||||
requireNonNull(mailboxProperties.getInboxId()),
|
|
||||||
file.name, tempFile);
|
|
||||||
} catch (IOException | ApiException e) {
|
|
||||||
if (!tempFile.delete()) {
|
|
||||||
LOG.warning("Failed to delete temporary file");
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
mailboxFileManager.handleDownloadedFile(tempFile);
|
|
||||||
deleteFile(file, queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void deleteFile(MailboxFile file, Queue<MailboxFile> queue) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(() -> apiCallDeleteFile(file, queue)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void apiCallDeleteFile(MailboxFile file, Queue<MailboxFile> queue)
|
|
||||||
throws IOException, ApiException {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
mailboxApi.deleteFile(mailboxProperties,
|
|
||||||
requireNonNull(mailboxProperties.getInboxId()), file.name);
|
|
||||||
} catch (TolerableFailureException e) {
|
|
||||||
// Catch this so we can continue to the next file
|
|
||||||
logException(LOG, INFO, e);
|
|
||||||
}
|
|
||||||
if (queue.isEmpty()) {
|
|
||||||
// List the inbox again to check for files that may have arrived
|
|
||||||
// while we were downloading
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(this::apiCallListInbox));
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
|
Queue<FolderFile> queue = new LinkedList<>();
|
||||||
|
for (MailboxFile file : files) {
|
||||||
|
queue.add(new FolderFile(folderId, file.name));
|
||||||
|
}
|
||||||
downloadNextFile(queue);
|
downloadNextFile(queue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onTorReachable() {
|
|
||||||
LOG.info("Our Tor hidden service is reachable");
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state != State.WAITING_FOR_TOR) return;
|
|
||||||
state = State.DOWNLOAD_CYCLE_2;
|
|
||||||
// Start second download cycle
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(this::apiCallListInbox));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,246 @@
|
|||||||
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.Cancellable;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFileId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
|
import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
||||||
|
import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import javax.annotation.concurrent.GuardedBy;
|
||||||
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
|
import static java.util.logging.Level.INFO;
|
||||||
|
import static java.util.logging.Logger.getLogger;
|
||||||
|
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||||
|
|
||||||
|
@ThreadSafe
|
||||||
|
@NotNullByDefault
|
||||||
|
abstract class MailboxDownloadWorker implements MailboxWorker,
|
||||||
|
ConnectivityObserver, TorReachabilityObserver {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When the worker is started it waits for a connectivity check, then
|
||||||
|
* starts its first download cycle: checking for files to download,
|
||||||
|
* downloading and deleting the files, and checking again until all files
|
||||||
|
* have been downloaded and deleted.
|
||||||
|
* <p>
|
||||||
|
* The worker then waits for our Tor hidden service to be reachable before
|
||||||
|
* starting its second download cycle. This ensures that if a contact
|
||||||
|
* tried and failed to connect to our hidden service before it was
|
||||||
|
* reachable, and therefore uploaded a file to the mailbox instead, we'll
|
||||||
|
* find the file in the second download cycle.
|
||||||
|
*/
|
||||||
|
protected enum State {
|
||||||
|
CREATED,
|
||||||
|
CONNECTIVITY_CHECK,
|
||||||
|
DOWNLOAD_CYCLE_1,
|
||||||
|
WAITING_FOR_TOR,
|
||||||
|
DOWNLOAD_CYCLE_2,
|
||||||
|
FINISHED,
|
||||||
|
DESTROYED
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static final Logger LOG =
|
||||||
|
getLogger(MailboxDownloadWorker.class.getName());
|
||||||
|
|
||||||
|
private final ConnectivityChecker connectivityChecker;
|
||||||
|
private final TorReachabilityMonitor torReachabilityMonitor;
|
||||||
|
protected final MailboxApiCaller mailboxApiCaller;
|
||||||
|
protected final MailboxApi mailboxApi;
|
||||||
|
private final MailboxFileManager mailboxFileManager;
|
||||||
|
protected final MailboxProperties mailboxProperties;
|
||||||
|
protected final Object lock = new Object();
|
||||||
|
|
||||||
|
@GuardedBy("lock")
|
||||||
|
protected State state = State.CREATED;
|
||||||
|
|
||||||
|
@GuardedBy("lock")
|
||||||
|
@Nullable
|
||||||
|
protected Cancellable apiCall = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the API call that starts the worker's download cycle.
|
||||||
|
*/
|
||||||
|
protected abstract ApiCall createApiCallForDownloadCycle();
|
||||||
|
|
||||||
|
MailboxDownloadWorker(
|
||||||
|
ConnectivityChecker connectivityChecker,
|
||||||
|
TorReachabilityMonitor torReachabilityMonitor,
|
||||||
|
MailboxApiCaller mailboxApiCaller,
|
||||||
|
MailboxApi mailboxApi,
|
||||||
|
MailboxFileManager mailboxFileManager,
|
||||||
|
MailboxProperties mailboxProperties) {
|
||||||
|
this.connectivityChecker = connectivityChecker;
|
||||||
|
this.torReachabilityMonitor = torReachabilityMonitor;
|
||||||
|
this.mailboxApiCaller = mailboxApiCaller;
|
||||||
|
this.mailboxApi = mailboxApi;
|
||||||
|
this.mailboxFileManager = mailboxFileManager;
|
||||||
|
this.mailboxProperties = mailboxProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
LOG.info("Started");
|
||||||
|
synchronized (lock) {
|
||||||
|
// Don't allow the worker to be reused
|
||||||
|
if (state != State.CREATED) return;
|
||||||
|
state = State.CONNECTIVITY_CHECK;
|
||||||
|
}
|
||||||
|
// Avoid leaking observer in case destroy() is called concurrently
|
||||||
|
// before observer is added
|
||||||
|
connectivityChecker.checkConnectivity(mailboxProperties, this);
|
||||||
|
boolean destroyed;
|
||||||
|
synchronized (lock) {
|
||||||
|
destroyed = state == State.DESTROYED;
|
||||||
|
}
|
||||||
|
if (destroyed) connectivityChecker.removeObserver(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
LOG.info("Destroyed");
|
||||||
|
Cancellable apiCall;
|
||||||
|
synchronized (lock) {
|
||||||
|
state = State.DESTROYED;
|
||||||
|
apiCall = this.apiCall;
|
||||||
|
this.apiCall = null;
|
||||||
|
}
|
||||||
|
if (apiCall != null) apiCall.cancel();
|
||||||
|
connectivityChecker.removeObserver(this);
|
||||||
|
torReachabilityMonitor.removeObserver(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConnectivityCheckSucceeded() {
|
||||||
|
LOG.info("Connectivity check succeeded");
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.CONNECTIVITY_CHECK) return;
|
||||||
|
state = State.DOWNLOAD_CYCLE_1;
|
||||||
|
// Start first download cycle
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
createApiCallForDownloadCycle());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void onDownloadCycleFinished() {
|
||||||
|
boolean addObserver = false;
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DOWNLOAD_CYCLE_1) {
|
||||||
|
LOG.info("First download cycle finished");
|
||||||
|
state = State.WAITING_FOR_TOR;
|
||||||
|
apiCall = null;
|
||||||
|
addObserver = true;
|
||||||
|
} else if (state == State.DOWNLOAD_CYCLE_2) {
|
||||||
|
LOG.info("Second download cycle finished");
|
||||||
|
state = State.FINISHED;
|
||||||
|
apiCall = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (addObserver) {
|
||||||
|
// Avoid leaking observer in case destroy() is called concurrently
|
||||||
|
// before observer is added
|
||||||
|
torReachabilityMonitor.addOneShotObserver(this);
|
||||||
|
boolean destroyed;
|
||||||
|
synchronized (lock) {
|
||||||
|
destroyed = state == State.DESTROYED;
|
||||||
|
}
|
||||||
|
if (destroyed) torReachabilityMonitor.removeObserver(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void downloadNextFile(Queue<FolderFile> queue) {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
FolderFile file = queue.remove();
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
new SimpleApiCall(() -> apiCallDownloadFile(file, queue)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void apiCallDownloadFile(FolderFile file, Queue<FolderFile> queue)
|
||||||
|
throws IOException, ApiException {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
}
|
||||||
|
LOG.info("Downloading file");
|
||||||
|
File tempFile = mailboxFileManager.createTempFileForDownload();
|
||||||
|
try {
|
||||||
|
mailboxApi.getFile(mailboxProperties, file.folderId, file.fileId,
|
||||||
|
tempFile);
|
||||||
|
} catch (IOException | ApiException e) {
|
||||||
|
if (!tempFile.delete()) {
|
||||||
|
LOG.warning("Failed to delete temporary file");
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
mailboxFileManager.handleDownloadedFile(tempFile);
|
||||||
|
deleteFile(file, queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteFile(FolderFile file, Queue<FolderFile> queue) {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
new SimpleApiCall(() -> apiCallDeleteFile(file, queue)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void apiCallDeleteFile(FolderFile file, Queue<FolderFile> queue)
|
||||||
|
throws IOException, ApiException {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
mailboxApi.deleteFile(mailboxProperties, file.folderId,
|
||||||
|
file.fileId);
|
||||||
|
} catch (TolerableFailureException e) {
|
||||||
|
// Catch this so we can continue to the next file
|
||||||
|
logException(LOG, INFO, e);
|
||||||
|
}
|
||||||
|
if (queue.isEmpty()) {
|
||||||
|
// Check for files again, as new files may have arrived while we
|
||||||
|
// were downloading
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
createApiCallForDownloadCycle());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
downloadNextFile(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTorReachable() {
|
||||||
|
LOG.info("Our Tor hidden service is reachable");
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.WAITING_FOR_TOR) return;
|
||||||
|
state = State.DOWNLOAD_CYCLE_2;
|
||||||
|
// Start second download cycle
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
createApiCallForDownloadCycle());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Package access for testing
|
||||||
|
static class FolderFile {
|
||||||
|
|
||||||
|
final MailboxFolderId folderId;
|
||||||
|
final MailboxFileId fileId;
|
||||||
|
|
||||||
|
FolderFile(MailboxFolderId folderId, MailboxFileId fileId) {
|
||||||
|
this.folderId = folderId;
|
||||||
|
this.fileId = fileId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,141 @@
|
|||||||
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
|
|
||||||
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
|
import static java.util.Collections.shuffle;
|
||||||
|
import static java.util.logging.Level.INFO;
|
||||||
|
|
||||||
|
@ThreadSafe
|
||||||
|
@NotNullByDefault
|
||||||
|
class OwnMailboxDownloadWorker extends MailboxDownloadWorker {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of files that will be downloaded before checking
|
||||||
|
* again for folders with available files. This ensures that if a file
|
||||||
|
* arrives during a download cycle, its folder will be checked within a
|
||||||
|
* reasonable amount of time even if some other folder has a very large
|
||||||
|
* number of files.
|
||||||
|
* <p>
|
||||||
|
* Package access for testing.
|
||||||
|
*/
|
||||||
|
static final int MAX_ROUND_ROBIN_FILES = 1000;
|
||||||
|
|
||||||
|
OwnMailboxDownloadWorker(
|
||||||
|
ConnectivityChecker connectivityChecker,
|
||||||
|
TorReachabilityMonitor torReachabilityMonitor,
|
||||||
|
MailboxApiCaller mailboxApiCaller,
|
||||||
|
MailboxApi mailboxApi,
|
||||||
|
MailboxFileManager mailboxFileManager,
|
||||||
|
MailboxProperties mailboxProperties) {
|
||||||
|
super(connectivityChecker, torReachabilityMonitor, mailboxApiCaller,
|
||||||
|
mailboxApi, mailboxFileManager, mailboxProperties);
|
||||||
|
if (!mailboxProperties.isOwner()) throw new IllegalArgumentException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ApiCall createApiCallForDownloadCycle() {
|
||||||
|
return new SimpleApiCall(this::apiCallListFolders);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void apiCallListFolders() throws IOException, ApiException {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
}
|
||||||
|
LOG.info("Listing folders with available files");
|
||||||
|
List<MailboxFolderId> folders =
|
||||||
|
mailboxApi.getFolders(mailboxProperties);
|
||||||
|
if (folders.isEmpty()) onDownloadCycleFinished();
|
||||||
|
else listNextFolder(new LinkedList<>(folders), new HashMap<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the next folder from `queue` and starts a task to list the
|
||||||
|
* files in the folder and add them to `available`.
|
||||||
|
*/
|
||||||
|
private void listNextFolder(Queue<MailboxFolderId> queue,
|
||||||
|
Map<MailboxFolderId, Queue<MailboxFile>> available) {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
MailboxFolderId folder = queue.remove();
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(new SimpleApiCall(() ->
|
||||||
|
apiCallListFolder(folder, queue, available)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void apiCallListFolder(MailboxFolderId folder,
|
||||||
|
Queue<MailboxFolderId> queue,
|
||||||
|
Map<MailboxFolderId, Queue<MailboxFile>> available)
|
||||||
|
throws IOException, ApiException {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
}
|
||||||
|
LOG.info("Listing folder");
|
||||||
|
List<MailboxFile> files =
|
||||||
|
mailboxApi.getFiles(mailboxProperties, folder);
|
||||||
|
if (!files.isEmpty()) available.put(folder, new LinkedList<>(files));
|
||||||
|
if (queue.isEmpty()) {
|
||||||
|
LOG.info("Finished listing folders");
|
||||||
|
if (available.isEmpty()) onDownloadCycleFinished();
|
||||||
|
else createDownloadQueue(available);
|
||||||
|
} else {
|
||||||
|
listNextFolder(queue, available);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Visits the given folders in round-robin order to create a queue of up to
|
||||||
|
* {@link #MAX_ROUND_ROBIN_FILES} to download.
|
||||||
|
*/
|
||||||
|
private void createDownloadQueue(
|
||||||
|
Map<MailboxFolderId, Queue<MailboxFile>> available) {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
}
|
||||||
|
if (LOG.isLoggable(INFO)) {
|
||||||
|
LOG.info(available.size() + " folders have available files");
|
||||||
|
}
|
||||||
|
Queue<FolderFile> queue = createRoundRobinQueue(available);
|
||||||
|
if (LOG.isLoggable(INFO)) {
|
||||||
|
LOG.info("Downloading " + queue.size() + " files");
|
||||||
|
}
|
||||||
|
downloadNextFile(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Package access for testing
|
||||||
|
Queue<FolderFile> createRoundRobinQueue(
|
||||||
|
Map<MailboxFolderId, Queue<MailboxFile>> available) {
|
||||||
|
List<MailboxFolderId> roundRobin = new ArrayList<>(available.keySet());
|
||||||
|
// Shuffle the folders so we don't always favour the same folders
|
||||||
|
shuffle(roundRobin);
|
||||||
|
Queue<FolderFile> queue = new LinkedList<>();
|
||||||
|
while (queue.size() < MAX_ROUND_ROBIN_FILES && !available.isEmpty()) {
|
||||||
|
Iterator<MailboxFolderId> it = roundRobin.iterator();
|
||||||
|
while (queue.size() < MAX_ROUND_ROBIN_FILES && it.hasNext()) {
|
||||||
|
MailboxFolderId folder = it.next();
|
||||||
|
Queue<MailboxFile> files = available.get(folder);
|
||||||
|
MailboxFile file = files.remove();
|
||||||
|
queue.add(new FolderFile(folder, file.name));
|
||||||
|
if (files.isEmpty()) {
|
||||||
|
available.remove(folder);
|
||||||
|
it.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,88 +1,68 @@
|
|||||||
package org.briarproject.bramble.mailbox;
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
import org.briarproject.bramble.api.Cancellable;
|
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxFileId;
|
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
|
||||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
|
||||||
import org.briarproject.bramble.test.CaptureArgumentAction;
|
|
||||||
import org.jmock.Expectations;
|
|
||||||
import org.jmock.lib.action.DoAllAction;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static java.util.Arrays.asList;
|
|
||||||
import static java.util.Collections.emptyList;
|
import static java.util.Collections.emptyList;
|
||||||
import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
|
import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
|
||||||
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
|
||||||
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
|
|
||||||
import static org.briarproject.bramble.test.TestUtils.getMailboxProperties;
|
import static org.briarproject.bramble.test.TestUtils.getMailboxProperties;
|
||||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
|
||||||
import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
|
public class ContactMailboxDownloadWorkerTest
|
||||||
|
extends MailboxDownloadWorkerTest<ContactMailboxDownloadWorker> {
|
||||||
|
|
||||||
private final ConnectivityChecker connectivityChecker =
|
public ContactMailboxDownloadWorkerTest() {
|
||||||
context.mock(ConnectivityChecker.class);
|
mailboxProperties = getMailboxProperties(false, CLIENT_SUPPORTS);
|
||||||
private final TorReachabilityMonitor torReachabilityMonitor =
|
|
||||||
context.mock(TorReachabilityMonitor.class);
|
|
||||||
private final MailboxApiCaller mailboxApiCaller =
|
|
||||||
context.mock(MailboxApiCaller.class);
|
|
||||||
private final MailboxApi mailboxApi = context.mock(MailboxApi.class);
|
|
||||||
private final MailboxFileManager mailboxFileManager =
|
|
||||||
context.mock(MailboxFileManager.class);
|
|
||||||
private final Cancellable apiCall = context.mock(Cancellable.class);
|
|
||||||
|
|
||||||
private final MailboxProperties mailboxProperties =
|
|
||||||
getMailboxProperties(false, CLIENT_SUPPORTS);
|
|
||||||
private final long now = System.currentTimeMillis();
|
|
||||||
private final MailboxFile file1 =
|
|
||||||
new MailboxFile(new MailboxFileId(getRandomId()), now - 1);
|
|
||||||
private final MailboxFile file2 =
|
|
||||||
new MailboxFile(new MailboxFileId(getRandomId()), now);
|
|
||||||
private final List<MailboxFile> files = asList(file1, file2);
|
|
||||||
|
|
||||||
private File testDir, tempFile;
|
|
||||||
private ContactMailboxDownloadWorker worker;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() {
|
|
||||||
testDir = getTestDirectory();
|
|
||||||
tempFile = new File(testDir, "temp");
|
|
||||||
worker = new ContactMailboxDownloadWorker(connectivityChecker,
|
worker = new ContactMailboxDownloadWorker(connectivityChecker,
|
||||||
torReachabilityMonitor, mailboxApiCaller, mailboxApi,
|
torReachabilityMonitor, mailboxApiCaller, mailboxApi,
|
||||||
mailboxFileManager, mailboxProperties);
|
mailboxFileManager, mailboxProperties);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() {
|
|
||||||
deleteTestDirectory(testDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChecksConnectivityWhenStartedAndRemovesObserverWhenDestroyed() {
|
public void testChecksConnectivityWhenStartedAndRemovesObserverWhenDestroyed() {
|
||||||
// When the worker is started it should start a connectivity check
|
// When the worker is started it should start a connectivity check
|
||||||
context.checking(new Expectations() {{
|
expectStartConnectivityCheck();
|
||||||
oneOf(connectivityChecker).checkConnectivity(
|
|
||||||
with(mailboxProperties), with(worker));
|
|
||||||
}});
|
|
||||||
|
|
||||||
worker.start();
|
worker.start();
|
||||||
|
|
||||||
// When the worker is destroyed it should remove the connectivity
|
// When the worker is destroyed it should remove the connectivity
|
||||||
// and reachability observers
|
// and reachability observers
|
||||||
context.checking(new Expectations() {{
|
expectRemoveObservers();
|
||||||
oneOf(connectivityChecker).removeObserver(worker);
|
worker.destroy();
|
||||||
oneOf(torReachabilityMonitor).removeObserver(worker);
|
}
|
||||||
}});
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChecksForFilesWhenConnectivityCheckSucceeds()
|
||||||
|
throws Exception {
|
||||||
|
// When the worker is started it should start a connectivity check
|
||||||
|
expectStartConnectivityCheck();
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// When the connectivity check succeeds, a list-inbox task should be
|
||||||
|
// started for the first download cycle
|
||||||
|
AtomicReference<ApiCall> listTask = new AtomicReference<>();
|
||||||
|
expectStartTask(listTask);
|
||||||
|
worker.onConnectivityCheckSucceeded();
|
||||||
|
|
||||||
|
// When the list-inbox tasks runs and finds no files to download,
|
||||||
|
// it should add a Tor reachability observer
|
||||||
|
expectCheckForFiles(mailboxProperties.getInboxId(), emptyList());
|
||||||
|
expectAddReachabilityObserver();
|
||||||
|
assertFalse(listTask.get().callApi());
|
||||||
|
|
||||||
|
// When the reachability observer is called, a list-inbox task should
|
||||||
|
// be started for the second download cycle
|
||||||
|
expectStartTask(listTask);
|
||||||
|
worker.onTorReachable();
|
||||||
|
|
||||||
|
// When the list-inbox tasks runs and finds no files to download,
|
||||||
|
// it should finish the second download cycle
|
||||||
|
expectCheckForFiles(mailboxProperties.getInboxId(), emptyList());
|
||||||
|
assertFalse(listTask.get().callApi());
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// and reachability observers
|
||||||
|
expectRemoveObservers();
|
||||||
worker.destroy();
|
worker.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,150 +70,67 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
|
|||||||
public void testDownloadsFilesWhenConnectivityCheckSucceeds()
|
public void testDownloadsFilesWhenConnectivityCheckSucceeds()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
// When the worker is started it should start a connectivity check
|
// When the worker is started it should start a connectivity check
|
||||||
context.checking(new Expectations() {{
|
expectStartConnectivityCheck();
|
||||||
oneOf(connectivityChecker).checkConnectivity(
|
|
||||||
with(mailboxProperties), with(worker));
|
|
||||||
}});
|
|
||||||
|
|
||||||
worker.start();
|
worker.start();
|
||||||
|
|
||||||
// When the connectivity check succeeds, a list-inbox task should be
|
// When the connectivity check succeeds, a list-inbox task should be
|
||||||
// started for the first download cycle
|
// started for the first download cycle
|
||||||
AtomicReference<ApiCall> listTask = new AtomicReference<>();
|
AtomicReference<ApiCall> listTask = new AtomicReference<>();
|
||||||
context.checking(new Expectations() {{
|
expectStartTask(listTask);
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
|
||||||
will(new DoAllAction(
|
|
||||||
new CaptureArgumentAction<>(listTask, ApiCall.class, 0),
|
|
||||||
returnValue(apiCall)
|
|
||||||
));
|
|
||||||
}});
|
|
||||||
|
|
||||||
worker.onConnectivityCheckSucceeded();
|
worker.onConnectivityCheckSucceeded();
|
||||||
|
|
||||||
// When the list-inbox tasks runs and finds some files to download,
|
// When the list-inbox tasks runs and finds some files to download,
|
||||||
// it should start a download task for the first file
|
// it should start a download task for the first file
|
||||||
AtomicReference<ApiCall> downloadTask = new AtomicReference<>();
|
AtomicReference<ApiCall> downloadTask = new AtomicReference<>();
|
||||||
context.checking(new Expectations() {{
|
expectCheckForFiles(mailboxProperties.getInboxId(), files);
|
||||||
oneOf(mailboxApi).getFiles(mailboxProperties,
|
expectStartTask(downloadTask);
|
||||||
requireNonNull(mailboxProperties.getInboxId()));
|
|
||||||
will(returnValue(files));
|
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
|
||||||
will(new DoAllAction(
|
|
||||||
new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0),
|
|
||||||
returnValue(apiCall)
|
|
||||||
));
|
|
||||||
}});
|
|
||||||
|
|
||||||
assertFalse(listTask.get().callApi());
|
assertFalse(listTask.get().callApi());
|
||||||
|
|
||||||
// When the first download task runs it should download the file to the
|
// When the first download task runs it should download the file to the
|
||||||
// location provided by the file manager and start a delete task
|
// location provided by the file manager and start a delete task
|
||||||
AtomicReference<ApiCall> deleteTask = new AtomicReference<>();
|
AtomicReference<ApiCall> deleteTask = new AtomicReference<>();
|
||||||
context.checking(new Expectations() {{
|
expectDownloadFile(mailboxProperties.getInboxId(), file1);
|
||||||
oneOf(mailboxFileManager).createTempFileForDownload();
|
expectStartTask(deleteTask);
|
||||||
will(returnValue(tempFile));
|
|
||||||
oneOf(mailboxApi).getFile(mailboxProperties,
|
|
||||||
requireNonNull(mailboxProperties.getInboxId()),
|
|
||||||
file1.name, tempFile);
|
|
||||||
oneOf(mailboxFileManager).handleDownloadedFile(tempFile);
|
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
|
||||||
will(new DoAllAction(
|
|
||||||
new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0),
|
|
||||||
returnValue(apiCall)
|
|
||||||
));
|
|
||||||
}});
|
|
||||||
|
|
||||||
assertFalse(downloadTask.get().callApi());
|
assertFalse(downloadTask.get().callApi());
|
||||||
|
|
||||||
// When the first delete task runs it should delete the file, ignore
|
// When the first delete task runs it should delete the file, ignore
|
||||||
// the tolerable failure, and start a download task for the next file
|
// the tolerable failure, and start a download task for the next file
|
||||||
context.checking(new Expectations() {{
|
expectDeleteFile(mailboxProperties.getInboxId(), file1, true);
|
||||||
oneOf(mailboxApi).deleteFile(mailboxProperties,
|
expectStartTask(downloadTask);
|
||||||
requireNonNull(mailboxProperties.getInboxId()), file1.name);
|
|
||||||
will(throwException(new TolerableFailureException()));
|
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
|
||||||
will(new DoAllAction(
|
|
||||||
new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0),
|
|
||||||
returnValue(apiCall)
|
|
||||||
));
|
|
||||||
}});
|
|
||||||
|
|
||||||
assertFalse(deleteTask.get().callApi());
|
assertFalse(deleteTask.get().callApi());
|
||||||
|
|
||||||
// When the second download task runs it should download the file to
|
// When the second download task runs it should download the file to
|
||||||
// the location provided by the file manager and start a delete task
|
// the location provided by the file manager and start a delete task
|
||||||
context.checking(new Expectations() {{
|
expectDownloadFile(mailboxProperties.getInboxId(), file2);
|
||||||
oneOf(mailboxFileManager).createTempFileForDownload();
|
expectStartTask(deleteTask);
|
||||||
will(returnValue(tempFile));
|
|
||||||
oneOf(mailboxApi).getFile(mailboxProperties,
|
|
||||||
requireNonNull(mailboxProperties.getInboxId()),
|
|
||||||
file2.name, tempFile);
|
|
||||||
oneOf(mailboxFileManager).handleDownloadedFile(tempFile);
|
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
|
||||||
will(new DoAllAction(
|
|
||||||
new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0),
|
|
||||||
returnValue(apiCall)
|
|
||||||
));
|
|
||||||
}});
|
|
||||||
|
|
||||||
assertFalse(downloadTask.get().callApi());
|
assertFalse(downloadTask.get().callApi());
|
||||||
|
|
||||||
// When the second delete task runs it should delete the file and
|
// When the second delete task runs it should delete the file and
|
||||||
// start a list-inbox task to check for files that may have arrived
|
// start a list-inbox task to check for files that may have arrived
|
||||||
// since the first download cycle started
|
// since the first download cycle started
|
||||||
context.checking(new Expectations() {{
|
expectDeleteFile(mailboxProperties.getInboxId(), file2, false);
|
||||||
oneOf(mailboxApi).deleteFile(mailboxProperties,
|
expectStartTask(listTask);
|
||||||
requireNonNull(mailboxProperties.getInboxId()), file2.name);
|
|
||||||
will(throwException(new TolerableFailureException()));
|
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
|
||||||
will(new DoAllAction(
|
|
||||||
new CaptureArgumentAction<>(listTask, ApiCall.class, 0),
|
|
||||||
returnValue(apiCall)
|
|
||||||
));
|
|
||||||
}});
|
|
||||||
|
|
||||||
assertFalse(deleteTask.get().callApi());
|
assertFalse(deleteTask.get().callApi());
|
||||||
|
|
||||||
// When the list-inbox tasks runs and finds no more files to download,
|
// When the list-inbox tasks runs and finds no more files to download,
|
||||||
// it should add a Tor reachability observer
|
// it should add a Tor reachability observer
|
||||||
context.checking(new Expectations() {{
|
expectCheckForFiles(mailboxProperties.getInboxId(), emptyList());
|
||||||
oneOf(mailboxApi).getFiles(mailboxProperties,
|
expectAddReachabilityObserver();
|
||||||
requireNonNull(mailboxProperties.getInboxId()));
|
|
||||||
will(returnValue(emptyList()));
|
|
||||||
oneOf(torReachabilityMonitor).addOneShotObserver(worker);
|
|
||||||
}});
|
|
||||||
|
|
||||||
assertFalse(listTask.get().callApi());
|
assertFalse(listTask.get().callApi());
|
||||||
|
|
||||||
// When the reachability observer is called, a list-inbox task should
|
// When the reachability observer is called, a list-inbox task should
|
||||||
// be started for the second download cycle
|
// be started for the second download cycle
|
||||||
context.checking(new Expectations() {{
|
expectStartTask(listTask);
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
|
||||||
will(new DoAllAction(
|
|
||||||
new CaptureArgumentAction<>(listTask, ApiCall.class, 0),
|
|
||||||
returnValue(apiCall)
|
|
||||||
));
|
|
||||||
}});
|
|
||||||
|
|
||||||
worker.onTorReachable();
|
worker.onTorReachable();
|
||||||
|
|
||||||
// When the list-inbox tasks runs and finds no more files to download,
|
// When the list-inbox tasks runs and finds no more files to download,
|
||||||
// it should finish the second download cycle
|
// it should finish the second download cycle
|
||||||
context.checking(new Expectations() {{
|
expectCheckForFiles(mailboxProperties.getInboxId(), emptyList());
|
||||||
oneOf(mailboxApi).getFiles(mailboxProperties,
|
|
||||||
requireNonNull(mailboxProperties.getInboxId()));
|
|
||||||
will(returnValue(emptyList()));
|
|
||||||
}});
|
|
||||||
|
|
||||||
assertFalse(listTask.get().callApi());
|
assertFalse(listTask.get().callApi());
|
||||||
|
|
||||||
// When the worker is destroyed it should remove the connectivity
|
// When the worker is destroyed it should remove the connectivity
|
||||||
// and reachability observers
|
// and reachability observers
|
||||||
context.checking(new Expectations() {{
|
expectRemoveObservers();
|
||||||
oneOf(connectivityChecker).removeObserver(worker);
|
|
||||||
oneOf(torReachabilityMonitor).removeObserver(worker);
|
|
||||||
}});
|
|
||||||
|
|
||||||
worker.destroy();
|
worker.destroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,130 @@
|
|||||||
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.Cancellable;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFileId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
||||||
|
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||||
|
import org.briarproject.bramble.test.CaptureArgumentAction;
|
||||||
|
import org.jmock.Expectations;
|
||||||
|
import org.jmock.lib.action.DoAllAction;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static java.util.Arrays.asList;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
|
||||||
|
|
||||||
|
abstract class MailboxDownloadWorkerTest<W extends MailboxDownloadWorker>
|
||||||
|
extends BrambleMockTestCase {
|
||||||
|
|
||||||
|
final ConnectivityChecker connectivityChecker =
|
||||||
|
context.mock(ConnectivityChecker.class);
|
||||||
|
final TorReachabilityMonitor torReachabilityMonitor =
|
||||||
|
context.mock(TorReachabilityMonitor.class);
|
||||||
|
final MailboxApiCaller mailboxApiCaller =
|
||||||
|
context.mock(MailboxApiCaller.class);
|
||||||
|
final MailboxApi mailboxApi = context.mock(MailboxApi.class);
|
||||||
|
final MailboxFileManager mailboxFileManager =
|
||||||
|
context.mock(MailboxFileManager.class);
|
||||||
|
private final Cancellable apiCall = context.mock(Cancellable.class);
|
||||||
|
|
||||||
|
private final long now = System.currentTimeMillis();
|
||||||
|
final MailboxFile file1 =
|
||||||
|
new MailboxFile(new MailboxFileId(getRandomId()), now - 1);
|
||||||
|
final MailboxFile file2 =
|
||||||
|
new MailboxFile(new MailboxFileId(getRandomId()), now);
|
||||||
|
final List<MailboxFile> files = asList(file1, file2);
|
||||||
|
|
||||||
|
private File testDir, tempFile;
|
||||||
|
MailboxProperties mailboxProperties;
|
||||||
|
W worker;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
testDir = getTestDirectory();
|
||||||
|
tempFile = new File(testDir, "temp");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
deleteTestDirectory(testDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void expectStartConnectivityCheck() {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(connectivityChecker).checkConnectivity(
|
||||||
|
with(mailboxProperties), with(worker));
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
void expectStartTask(AtomicReference<ApiCall> task) {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(task, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
void expectCheckForFoldersWithAvailableFiles(
|
||||||
|
List<MailboxFolderId> folderIds) throws Exception {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(mailboxApi).getFolders(mailboxProperties);
|
||||||
|
will(returnValue(folderIds));
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
void expectCheckForFiles(MailboxFolderId folderId,
|
||||||
|
List<MailboxFile> files) throws Exception {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(mailboxApi).getFiles(mailboxProperties, folderId);
|
||||||
|
will(returnValue(files));
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
void expectDownloadFile(MailboxFolderId folderId,
|
||||||
|
MailboxFile file)
|
||||||
|
throws Exception {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(mailboxFileManager).createTempFileForDownload();
|
||||||
|
will(returnValue(tempFile));
|
||||||
|
oneOf(mailboxApi).getFile(mailboxProperties, folderId, file.name,
|
||||||
|
tempFile);
|
||||||
|
oneOf(mailboxFileManager).handleDownloadedFile(tempFile);
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
void expectDeleteFile(MailboxFolderId folderId, MailboxFile file,
|
||||||
|
boolean tolerableFailure) throws Exception {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(mailboxApi).deleteFile(mailboxProperties, folderId,
|
||||||
|
file.name);
|
||||||
|
if (tolerableFailure) {
|
||||||
|
will(throwException(new TolerableFailureException()));
|
||||||
|
}
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
void expectAddReachabilityObserver() {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(torReachabilityMonitor).addOneShotObserver(worker);
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
void expectRemoveObservers() {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(connectivityChecker).removeObserver(worker);
|
||||||
|
oneOf(torReachabilityMonitor).removeObserver(worker);
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,236 @@
|
|||||||
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFileId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static java.util.Arrays.asList;
|
||||||
|
import static java.util.Collections.emptyList;
|
||||||
|
import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
|
||||||
|
import static org.briarproject.bramble.mailbox.MailboxDownloadWorker.FolderFile;
|
||||||
|
import static org.briarproject.bramble.mailbox.OwnMailboxDownloadWorker.MAX_ROUND_ROBIN_FILES;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.getMailboxProperties;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
|
public class OwnMailboxDownloadWorkerTest
|
||||||
|
extends MailboxDownloadWorkerTest<OwnMailboxDownloadWorker> {
|
||||||
|
|
||||||
|
private final MailboxFolderId folderId1 =
|
||||||
|
new MailboxFolderId(getRandomId());
|
||||||
|
private final MailboxFolderId folderId2 =
|
||||||
|
new MailboxFolderId(getRandomId());
|
||||||
|
private final List<MailboxFolderId> folderIds =
|
||||||
|
asList(folderId1, folderId2);
|
||||||
|
|
||||||
|
public OwnMailboxDownloadWorkerTest() {
|
||||||
|
mailboxProperties = getMailboxProperties(true, CLIENT_SUPPORTS);
|
||||||
|
worker = new OwnMailboxDownloadWorker(connectivityChecker,
|
||||||
|
torReachabilityMonitor, mailboxApiCaller, mailboxApi,
|
||||||
|
mailboxFileManager, mailboxProperties);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUp() {
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChecksConnectivityWhenStartedAndRemovesObserverWhenDestroyed() {
|
||||||
|
// When the worker is started it should start a connectivity check
|
||||||
|
expectStartConnectivityCheck();
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// and reachability observers
|
||||||
|
expectRemoveObservers();
|
||||||
|
worker.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChecksForFilesWhenConnectivityCheckSucceeds()
|
||||||
|
throws Exception {
|
||||||
|
// When the worker is started it should start a connectivity check
|
||||||
|
expectStartConnectivityCheck();
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// When the connectivity check succeeds, a list-folders task should be
|
||||||
|
// started for the first download cycle
|
||||||
|
AtomicReference<ApiCall> listFoldersTask = new AtomicReference<>();
|
||||||
|
expectStartTask(listFoldersTask);
|
||||||
|
worker.onConnectivityCheckSucceeded();
|
||||||
|
|
||||||
|
// When the list-folders tasks runs and finds no folders with files
|
||||||
|
// to download, it should add a Tor reachability observer
|
||||||
|
expectCheckForFoldersWithAvailableFiles(emptyList());
|
||||||
|
expectAddReachabilityObserver();
|
||||||
|
assertFalse(listFoldersTask.get().callApi());
|
||||||
|
|
||||||
|
// When the reachability observer is called, a list-folders task should
|
||||||
|
// be started for the second download cycle
|
||||||
|
expectStartTask(listFoldersTask);
|
||||||
|
worker.onTorReachable();
|
||||||
|
|
||||||
|
// When the list-folders tasks runs and finds no folders with files
|
||||||
|
// to download, it should finish the second download cycle
|
||||||
|
expectCheckForFoldersWithAvailableFiles(emptyList());
|
||||||
|
assertFalse(listFoldersTask.get().callApi());
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// and reachability observers
|
||||||
|
expectRemoveObservers();
|
||||||
|
worker.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDownloadsFilesWhenConnectivityCheckSucceeds()
|
||||||
|
throws Exception {
|
||||||
|
// When the worker is started it should start a connectivity check
|
||||||
|
expectStartConnectivityCheck();
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// When the connectivity check succeeds, a list-folders task should be
|
||||||
|
// started for the first download cycle
|
||||||
|
AtomicReference<ApiCall> listFoldersTask = new AtomicReference<>();
|
||||||
|
expectStartTask(listFoldersTask);
|
||||||
|
worker.onConnectivityCheckSucceeded();
|
||||||
|
|
||||||
|
// When the list-folders tasks runs and finds some folders with files
|
||||||
|
// to download, it should start a list-files task for the first folder
|
||||||
|
AtomicReference<ApiCall> listFilesTask = new AtomicReference<>();
|
||||||
|
expectCheckForFoldersWithAvailableFiles(folderIds);
|
||||||
|
expectStartTask(listFilesTask);
|
||||||
|
assertFalse(listFoldersTask.get().callApi());
|
||||||
|
|
||||||
|
// When the first list-files task runs and finds no files to download,
|
||||||
|
// it should start a second list-files task for the next folder
|
||||||
|
expectCheckForFiles(folderId1, emptyList());
|
||||||
|
expectStartTask(listFilesTask);
|
||||||
|
assertFalse(listFilesTask.get().callApi());
|
||||||
|
|
||||||
|
// When the second list-files task runs and finds some files to
|
||||||
|
// download, it should create the round-robin queue and start a
|
||||||
|
// download task for the first file
|
||||||
|
AtomicReference<ApiCall> downloadTask = new AtomicReference<>();
|
||||||
|
expectCheckForFiles(folderId2, files);
|
||||||
|
expectStartTask(downloadTask);
|
||||||
|
assertFalse(listFilesTask.get().callApi());
|
||||||
|
|
||||||
|
// When the first download task runs it should download the file to the
|
||||||
|
// location provided by the file manager and start a delete task
|
||||||
|
AtomicReference<ApiCall> deleteTask = new AtomicReference<>();
|
||||||
|
expectDownloadFile(folderId2, file1);
|
||||||
|
expectStartTask(deleteTask);
|
||||||
|
assertFalse(downloadTask.get().callApi());
|
||||||
|
|
||||||
|
// When the first delete task runs it should delete the file, ignore
|
||||||
|
// the tolerable failure, and start a download task for the next file
|
||||||
|
expectDeleteFile(folderId2, file1, true); // Delete fails tolerably
|
||||||
|
expectStartTask(downloadTask);
|
||||||
|
assertFalse(deleteTask.get().callApi());
|
||||||
|
|
||||||
|
// When the second download task runs it should download the file to
|
||||||
|
// the location provided by the file manager and start a delete task
|
||||||
|
expectDownloadFile(folderId2, file2);
|
||||||
|
expectStartTask(deleteTask);
|
||||||
|
assertFalse(downloadTask.get().callApi());
|
||||||
|
|
||||||
|
// When the second delete task runs it should delete the file and
|
||||||
|
// start a list-inbox task to check for files that may have arrived
|
||||||
|
// since the first download cycle started
|
||||||
|
expectDeleteFile(folderId2, file2, false); // Delete succeeds
|
||||||
|
expectStartTask(listFoldersTask);
|
||||||
|
assertFalse(deleteTask.get().callApi());
|
||||||
|
|
||||||
|
// When the list-inbox tasks runs and finds no more files to download,
|
||||||
|
// it should add a Tor reachability observer
|
||||||
|
expectCheckForFoldersWithAvailableFiles(emptyList());
|
||||||
|
expectAddReachabilityObserver();
|
||||||
|
assertFalse(listFoldersTask.get().callApi());
|
||||||
|
|
||||||
|
// When the reachability observer is called, a list-inbox task should
|
||||||
|
// be started for the second download cycle
|
||||||
|
expectStartTask(listFoldersTask);
|
||||||
|
worker.onTorReachable();
|
||||||
|
|
||||||
|
// When the list-inbox tasks runs and finds no more files to download,
|
||||||
|
// it should finish the second download cycle
|
||||||
|
expectCheckForFoldersWithAvailableFiles(emptyList());
|
||||||
|
assertFalse(listFoldersTask.get().callApi());
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// and reachability observers
|
||||||
|
expectRemoveObservers();
|
||||||
|
worker.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRoundRobinQueueVisitsAllFolders() {
|
||||||
|
// Ten folders with two files each
|
||||||
|
Map<MailboxFolderId, Queue<MailboxFile>> available =
|
||||||
|
createAvailableFiles(10, 2);
|
||||||
|
Queue<FolderFile> queue = worker.createRoundRobinQueue(available);
|
||||||
|
// Check that all files were queued
|
||||||
|
for (MailboxFolderId folderId : available.keySet()) {
|
||||||
|
assertEquals(2, countFilesWithFolderId(queue, folderId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSizeOfRoundRobinQueueIsLimited() {
|
||||||
|
// Two folders with MAX_ROUND_ROBIN_FILES each
|
||||||
|
Map<MailboxFolderId, Queue<MailboxFile>> available =
|
||||||
|
createAvailableFiles(2, MAX_ROUND_ROBIN_FILES);
|
||||||
|
Queue<FolderFile> queue = worker.createRoundRobinQueue(available);
|
||||||
|
// Check that half the files in each folder were queued
|
||||||
|
for (MailboxFolderId folderId : available.keySet()) {
|
||||||
|
assertEquals(MAX_ROUND_ROBIN_FILES / 2,
|
||||||
|
countFilesWithFolderId(queue, folderId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<MailboxFolderId, Queue<MailboxFile>> createAvailableFiles(
|
||||||
|
int numFolders, int numFiles) {
|
||||||
|
Map<MailboxFolderId, Queue<MailboxFile>> available = new HashMap<>();
|
||||||
|
List<MailboxFolderId> folderIds = createFolderIds(numFolders);
|
||||||
|
for (MailboxFolderId folderId : folderIds) {
|
||||||
|
available.put(folderId, createFiles(numFiles));
|
||||||
|
}
|
||||||
|
return available;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<MailboxFolderId> createFolderIds(int size) {
|
||||||
|
List<MailboxFolderId> folderIds = new ArrayList<>(size);
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
folderIds.add(new MailboxFolderId(getRandomId()));
|
||||||
|
}
|
||||||
|
return folderIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Queue<MailboxFile> createFiles(int size) {
|
||||||
|
Queue<MailboxFile> files = new LinkedList<>();
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
files.add(new MailboxFile(new MailboxFileId(getRandomId()), i));
|
||||||
|
}
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int countFilesWithFolderId(Queue<FolderFile> queue,
|
||||||
|
MailboxFolderId folderId) {
|
||||||
|
int count = 0;
|
||||||
|
for (FolderFile file : queue) {
|
||||||
|
if (file.folderId.equals(folderId)) count++;
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user