Add download worker for own mailbox.

This commit is contained in:
akwizgran
2022-07-15 15:19:44 +01:00
parent 913e5da2f5
commit 9764aba47d
2 changed files with 680 additions and 0 deletions

View File

@@ -0,0 +1,341 @@
package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.Cancellable;
import org.briarproject.bramble.api.mailbox.MailboxFileId;
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
import org.briarproject.bramble.api.mailbox.MailboxProperties;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver;
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import static java.util.Collections.shuffle;
import static java.util.logging.Level.INFO;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.util.LogUtils.logException;
@ThreadSafe
@NotNullByDefault
class OwnMailboxDownloadWorker implements MailboxWorker, ConnectivityObserver,
TorReachabilityObserver {
/**
* When the worker is started it waits for a connectivity check, then
* starts its first download cycle: checking for folders with available
* files, listing the files in each folder, downloading and deleting the
* files, and checking again until all files have been downloaded and
* deleted.
* <p>
* The worker then waits for our Tor hidden service to be reachable before
* starting its second download cycle. This ensures that if a contact
* tried and failed to connect to our hidden service before it was
* reachable, and therefore uploaded a file to the mailbox instead, we'll
* find the file in the second download cycle.
*/
private enum State {
CREATED,
CONNECTIVITY_CHECK,
DOWNLOAD_CYCLE_1,
WAITING_FOR_TOR,
DOWNLOAD_CYCLE_2,
FINISHED,
DESTROYED
}
/**
* The maximum number of files that will be downloaded before checking
* again for folders with available files. This ensures that if a file
* arrives during a download cycle, its folder will be checked within a
* reasonable amount of time even if another folder has a very large number
* of files.
* <p>
* Package access for testing.
*/
static final int MAX_ROUND_ROBIN_FILES = 1000;
private static final Logger LOG =
getLogger(OwnMailboxDownloadWorker.class.getName());
private final ConnectivityChecker connectivityChecker;
private final TorReachabilityMonitor torReachabilityMonitor;
private final MailboxApiCaller mailboxApiCaller;
private final MailboxApi mailboxApi;
private final MailboxFileManager mailboxFileManager;
private final MailboxProperties mailboxProperties;
private final Object lock = new Object();
@GuardedBy("lock")
private State state = State.CREATED;
@GuardedBy("lock")
@Nullable
private Cancellable apiCall = null;
OwnMailboxDownloadWorker(
ConnectivityChecker connectivityChecker,
TorReachabilityMonitor torReachabilityMonitor,
MailboxApiCaller mailboxApiCaller,
MailboxApi mailboxApi,
MailboxFileManager mailboxFileManager,
MailboxProperties mailboxProperties) {
if (!mailboxProperties.isOwner()) throw new IllegalArgumentException();
this.connectivityChecker = connectivityChecker;
this.torReachabilityMonitor = torReachabilityMonitor;
this.mailboxApiCaller = mailboxApiCaller;
this.mailboxApi = mailboxApi;
this.mailboxFileManager = mailboxFileManager;
this.mailboxProperties = mailboxProperties;
}
@Override
public void start() {
LOG.info("Started");
synchronized (lock) {
// Don't allow the worker to be reused
if (state != State.CREATED) return;
state = State.CONNECTIVITY_CHECK;
}
// Avoid leaking observer in case destroy() is called concurrently
// before observer is added
connectivityChecker.checkConnectivity(mailboxProperties, this);
boolean destroyed;
synchronized (lock) {
destroyed = state == State.DESTROYED;
}
if (destroyed) connectivityChecker.removeObserver(this);
}
@Override
public void destroy() {
LOG.info("Destroyed");
Cancellable apiCall;
synchronized (lock) {
state = State.DESTROYED;
apiCall = this.apiCall;
this.apiCall = null;
}
if (apiCall != null) apiCall.cancel();
connectivityChecker.removeObserver(this);
torReachabilityMonitor.removeObserver(this);
}
@Override
public void onConnectivityCheckSucceeded() {
LOG.info("Connectivity check succeeded");
synchronized (lock) {
if (state != State.CONNECTIVITY_CHECK) return;
state = State.DOWNLOAD_CYCLE_1;
// Start first download cycle
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(this::apiCallListFolders));
}
}
private void apiCallListFolders() throws IOException, ApiException {
synchronized (lock) {
if (state == State.DESTROYED) return;
}
LOG.info("Listing folders with available files");
List<MailboxFolderId> folders =
mailboxApi.getFolders(mailboxProperties);
if (folders.isEmpty()) onDownloadCycleFinished();
else listNextFolder(new LinkedList<>(folders), new HashMap<>());
}
private void onDownloadCycleFinished() {
boolean addObserver = false;
synchronized (lock) {
if (state == State.DOWNLOAD_CYCLE_1) {
LOG.info("First download cycle finished");
state = State.WAITING_FOR_TOR;
apiCall = null;
addObserver = true;
} else if (state == State.DOWNLOAD_CYCLE_2) {
LOG.info("Second download cycle finished");
state = State.FINISHED;
apiCall = null;
}
}
if (addObserver) {
// Avoid leaking observer in case destroy() is called concurrently
// before observer is added
torReachabilityMonitor.addOneShotObserver(this);
boolean destroyed;
synchronized (lock) {
destroyed = state == State.DESTROYED;
}
if (destroyed) torReachabilityMonitor.removeObserver(this);
}
}
private void listNextFolder(Queue<MailboxFolderId> queue,
Map<MailboxFolderId, Queue<MailboxFile>> available) {
synchronized (lock) {
if (state == State.DESTROYED) return;
MailboxFolderId folder = queue.remove();
apiCall = mailboxApiCaller.retryWithBackoff(new SimpleApiCall(() ->
apiCallListFolder(folder, queue, available)));
}
}
private void apiCallListFolder(MailboxFolderId folder,
Queue<MailboxFolderId> queue,
Map<MailboxFolderId, Queue<MailboxFile>> available)
throws IOException, ApiException {
synchronized (lock) {
if (state == State.DESTROYED) return;
}
LOG.info("Listing folder");
List<MailboxFile> files =
mailboxApi.getFiles(mailboxProperties, folder);
if (!files.isEmpty()) available.put(folder, new LinkedList<>(files));
if (queue.isEmpty()) {
LOG.info("Finished listing folders");
if (available.isEmpty()) onDownloadCycleFinished();
else createDownloadQueue(available);
} else {
listNextFolder(queue, available);
}
}
private void createDownloadQueue(
Map<MailboxFolderId, Queue<MailboxFile>> available) {
synchronized (lock) {
if (state == State.DESTROYED) return;
}
if (LOG.isLoggable(INFO)) {
LOG.info(available.size() + " folders have available files");
}
Queue<FolderFile> queue = createRoundRobinQueue(available);
if (LOG.isLoggable(INFO)) {
LOG.info("Downloading " + queue.size() + " files");
}
downloadNextFile(queue);
}
// Package access for testing
Queue<FolderFile> createRoundRobinQueue(
Map<MailboxFolderId, Queue<MailboxFile>> available) {
List<MailboxFolderId> roundRobin = new ArrayList<>(available.keySet());
// Shuffle the folders so we don't always favour the same folders
shuffle(roundRobin);
Queue<FolderFile> queue = new LinkedList<>();
while (queue.size() < MAX_ROUND_ROBIN_FILES && !available.isEmpty()) {
Iterator<MailboxFolderId> it = roundRobin.iterator();
while (queue.size() < MAX_ROUND_ROBIN_FILES && it.hasNext()) {
MailboxFolderId folder = it.next();
Queue<MailboxFile> files = available.get(folder);
MailboxFile file = files.remove();
queue.add(new FolderFile(folder, file.name));
if (files.isEmpty()) {
available.remove(folder);
it.remove();
}
}
}
return queue;
}
private void downloadNextFile(Queue<FolderFile> queue) {
synchronized (lock) {
if (state == State.DESTROYED) return;
FolderFile file = queue.remove();
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(() -> apiCallDownloadFile(file, queue)));
}
}
private void apiCallDownloadFile(FolderFile file,
Queue<FolderFile> queue) throws IOException, ApiException {
synchronized (lock) {
if (state == State.DESTROYED) return;
}
LOG.info("Downloading file");
File tempFile = mailboxFileManager.createTempFileForDownload();
try {
mailboxApi.getFile(mailboxProperties, file.folderId, file.fileId,
tempFile);
} catch (IOException | ApiException e) {
if (!tempFile.delete()) {
LOG.warning("Failed to delete temporary file");
}
throw e;
}
mailboxFileManager.handleDownloadedFile(tempFile);
deleteFile(file, queue);
}
private void deleteFile(FolderFile file, Queue<FolderFile> queue) {
synchronized (lock) {
if (state == State.DESTROYED) return;
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(() -> apiCallDeleteFile(file, queue)));
}
}
private void apiCallDeleteFile(FolderFile file, Queue<FolderFile> queue)
throws IOException, ApiException {
synchronized (lock) {
if (state == State.DESTROYED) return;
}
try {
mailboxApi.deleteFile(mailboxProperties, file.folderId,
file.fileId);
} catch (TolerableFailureException e) {
// Catch this so we can continue to the next file
logException(LOG, INFO, e);
}
if (queue.isEmpty()) {
// List the folders with available files again to check for files
// that may have arrived while we were downloading
synchronized (lock) {
if (state == State.DESTROYED) return;
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(this::apiCallListFolders));
}
} else {
downloadNextFile(queue);
}
}
@Override
public void onTorReachable() {
LOG.info("Our Tor hidden service is reachable");
synchronized (lock) {
if (state != State.WAITING_FOR_TOR) return;
state = State.DOWNLOAD_CYCLE_2;
// Start second download cycle
apiCall = mailboxApiCaller.retryWithBackoff(
new SimpleApiCall(this::apiCallListFolders));
}
}
// Package access for testing
static class FolderFile {
final MailboxFolderId folderId;
final MailboxFileId fileId;
private FolderFile(MailboxFolderId folderId, MailboxFileId fileId) {
this.folderId = folderId;
this.fileId = fileId;
}
}
}

View File

@@ -0,0 +1,339 @@
package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.Cancellable;
import org.briarproject.bramble.api.mailbox.MailboxFileId;
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
import org.briarproject.bramble.api.mailbox.MailboxProperties;
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
import org.briarproject.bramble.mailbox.OwnMailboxDownloadWorker.FolderFile;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.briarproject.bramble.test.CaptureArgumentAction;
import org.jmock.Expectations;
import org.jmock.lib.action.DoAllAction;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
import static org.briarproject.bramble.mailbox.OwnMailboxDownloadWorker.MAX_ROUND_ROBIN_FILES;
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
import static org.briarproject.bramble.test.TestUtils.getMailboxProperties;
import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class OwnMailboxDownloadWorkerTest extends BrambleMockTestCase {
private final ConnectivityChecker connectivityChecker =
context.mock(ConnectivityChecker.class);
private final TorReachabilityMonitor torReachabilityMonitor =
context.mock(TorReachabilityMonitor.class);
private final MailboxApiCaller mailboxApiCaller =
context.mock(MailboxApiCaller.class);
private final MailboxApi mailboxApi = context.mock(MailboxApi.class);
private final MailboxFileManager mailboxFileManager =
context.mock(MailboxFileManager.class);
private final Cancellable apiCall = context.mock(Cancellable.class);
private final MailboxProperties mailboxProperties =
getMailboxProperties(true, CLIENT_SUPPORTS);
private final long now = System.currentTimeMillis();
private final MailboxFolderId folderId1 =
new MailboxFolderId(getRandomId());
private final MailboxFolderId folderId2 =
new MailboxFolderId(getRandomId());
private final List<MailboxFolderId> folderIds =
asList(folderId1, folderId2);
private final MailboxFile file1 =
new MailboxFile(new MailboxFileId(getRandomId()), now - 1);
private final MailboxFile file2 =
new MailboxFile(new MailboxFileId(getRandomId()), now);
private final List<MailboxFile> files = asList(file1, file2);
private File testDir, tempFile;
private OwnMailboxDownloadWorker worker;
@Before
public void setUp() {
testDir = getTestDirectory();
tempFile = new File(testDir, "temp");
worker = new OwnMailboxDownloadWorker(connectivityChecker,
torReachabilityMonitor, mailboxApiCaller, mailboxApi,
mailboxFileManager, mailboxProperties);
}
@After
public void tearDown() {
deleteTestDirectory(testDir);
}
@Test
public void testChecksConnectivityWhenStartedAndRemovesObserverWhenDestroyed() {
// When the worker is started it should start a connectivity check
expectStartConnectivityCheck();
worker.start();
// When the worker is destroyed it should remove the connectivity
// and reachability observers
expectRemoveObservers();
worker.destroy();
}
@Test
public void testChecksForFilesWhenConnectivityCheckSucceeds()
throws Exception {
// When the worker is started it should start a connectivity check
expectStartConnectivityCheck();
worker.start();
// When the connectivity check succeeds, a list-folders task should be
// started for the first download cycle
AtomicReference<ApiCall> listFoldersTask = new AtomicReference<>();
expectStartTask(listFoldersTask);
worker.onConnectivityCheckSucceeded();
// When the list-folders tasks runs and finds no folders with files
// to download, it should add a Tor reachability observer
expectCheckForFoldersWithAvailableFiles(emptyList());
expectAddReachabilityObserver();
assertFalse(listFoldersTask.get().callApi());
// When the reachability observer is called, a list-folders task should
// be started for the second download cycle
expectStartTask(listFoldersTask);
worker.onTorReachable();
// When the list-folders tasks runs and finds no folders with files
// to download, it should finish the second download cycle
expectCheckForFoldersWithAvailableFiles(emptyList());
assertFalse(listFoldersTask.get().callApi());
// When the worker is destroyed it should remove the connectivity
// and reachability observers
expectRemoveObservers();
worker.destroy();
}
@Test
public void testDownloadsFilesWhenConnectivityCheckSucceeds()
throws Exception {
// When the worker is started it should start a connectivity check
expectStartConnectivityCheck();
worker.start();
// When the connectivity check succeeds, a list-folders task should be
// started for the first download cycle
AtomicReference<ApiCall> listFoldersTask = new AtomicReference<>();
expectStartTask(listFoldersTask);
worker.onConnectivityCheckSucceeded();
// When the list-folders tasks runs and finds some folders with files
// to download, it should start a list-files task for the first folder
AtomicReference<ApiCall> listFilesTask = new AtomicReference<>();
expectCheckForFoldersWithAvailableFiles(folderIds);
expectStartTask(listFilesTask);
assertFalse(listFoldersTask.get().callApi());
// When the first list-files task runs and finds no files to download,
// it should start a second list-files task for the next folder
expectCheckForFiles(folderId1, emptyList());
expectStartTask(listFilesTask);
assertFalse(listFilesTask.get().callApi());
// When the second list-files task runs and finds some files to
// download, it should create the round-robin queue and start a
// download task for the first file
AtomicReference<ApiCall> downloadTask = new AtomicReference<>();
expectCheckForFiles(folderId2, files);
expectStartTask(downloadTask);
assertFalse(listFilesTask.get().callApi());
// When the first download task runs it should download the file to the
// location provided by the file manager and start a delete task
AtomicReference<ApiCall> deleteTask = new AtomicReference<>();
expectDownloadFile(folderId2, file1);
expectStartTask(deleteTask);
assertFalse(downloadTask.get().callApi());
// When the first delete task runs it should delete the file, ignore
// the tolerable failure, and start a download task for the next file
expectDeleteFile(folderId2, file1, true); // Delete fails tolerably
expectStartTask(downloadTask);
assertFalse(deleteTask.get().callApi());
// When the second download task runs it should download the file to
// the location provided by the file manager and start a delete task
expectDownloadFile(folderId2, file2);
expectStartTask(deleteTask);
assertFalse(downloadTask.get().callApi());
// When the second delete task runs it should delete the file and
// start a list-inbox task to check for files that may have arrived
// since the first download cycle started
expectDeleteFile(folderId2, file2, false); // Delete succeeds
expectStartTask(listFoldersTask);
assertFalse(deleteTask.get().callApi());
// When the list-inbox tasks runs and finds no more files to download,
// it should add a Tor reachability observer
expectCheckForFoldersWithAvailableFiles(emptyList());
expectAddReachabilityObserver();
assertFalse(listFoldersTask.get().callApi());
// When the reachability observer is called, a list-inbox task should
// be started for the second download cycle
expectStartTask(listFoldersTask);
worker.onTorReachable();
// When the list-inbox tasks runs and finds no more files to download,
// it should finish the second download cycle
expectCheckForFoldersWithAvailableFiles(emptyList());
assertFalse(listFoldersTask.get().callApi());
// When the worker is destroyed it should remove the connectivity
// and reachability observers
expectRemoveObservers();
worker.destroy();
}
@Test
public void testRoundRobinQueueVisitsAllFolders() {
// Ten folders with two files each
Map<MailboxFolderId, Queue<MailboxFile>> available =
createAvailableFiles(10, 2);
Queue<FolderFile> queue = worker.createRoundRobinQueue(available);
// Check that all files were queued
for (MailboxFolderId folderId : available.keySet()) {
assertEquals(2, countFilesWithFolderId(queue, folderId));
}
}
@Test
public void testSizeOfRoundRobinQueueIsLimited() {
// Two folders with MAX_ROUND_ROBIN_FILES each
Map<MailboxFolderId, Queue<MailboxFile>> available =
createAvailableFiles(2, MAX_ROUND_ROBIN_FILES);
Queue<FolderFile> queue = worker.createRoundRobinQueue(available);
// Check that half the files in each folder were queued
for (MailboxFolderId folderId : available.keySet()) {
assertEquals(MAX_ROUND_ROBIN_FILES / 2,
countFilesWithFolderId(queue, folderId));
}
}
private void expectStartConnectivityCheck() {
context.checking(new Expectations() {{
oneOf(connectivityChecker).checkConnectivity(
with(mailboxProperties), with(worker));
}});
}
private void expectStartTask(AtomicReference<ApiCall> task) {
context.checking(new Expectations() {{
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
will(new DoAllAction(
new CaptureArgumentAction<>(task, ApiCall.class, 0),
returnValue(apiCall)
));
}});
}
private void expectCheckForFoldersWithAvailableFiles(
List<MailboxFolderId> folderIds) throws Exception {
context.checking(new Expectations() {{
oneOf(mailboxApi).getFolders(mailboxProperties);
will(returnValue(folderIds));
}});
}
private void expectCheckForFiles(MailboxFolderId folderId,
List<MailboxFile> files) throws Exception {
context.checking(new Expectations() {{
oneOf(mailboxApi).getFiles(mailboxProperties, folderId);
will(returnValue(files));
}});
}
private void expectDownloadFile(MailboxFolderId folderId, MailboxFile file)
throws Exception {
context.checking(new Expectations() {{
oneOf(mailboxFileManager).createTempFileForDownload();
will(returnValue(tempFile));
oneOf(mailboxApi).getFile(mailboxProperties, folderId, file.name,
tempFile);
oneOf(mailboxFileManager).handleDownloadedFile(tempFile);
}});
}
private void expectDeleteFile(MailboxFolderId folderId, MailboxFile file,
boolean tolerableFailure) throws Exception {
context.checking(new Expectations() {{
oneOf(mailboxApi).deleteFile(mailboxProperties, folderId,
file.name);
if (tolerableFailure) {
will(throwException(new TolerableFailureException()));
}
}});
}
private void expectAddReachabilityObserver() {
context.checking(new Expectations() {{
oneOf(torReachabilityMonitor).addOneShotObserver(worker);
}});
}
private void expectRemoveObservers() {
context.checking(new Expectations() {{
oneOf(connectivityChecker).removeObserver(worker);
oneOf(torReachabilityMonitor).removeObserver(worker);
}});
}
private Map<MailboxFolderId, Queue<MailboxFile>> createAvailableFiles(
int numFolders, int numFiles) {
Map<MailboxFolderId, Queue<MailboxFile>> available = new HashMap<>();
List<MailboxFolderId> folderIds = createFolderIds(numFolders);
for (MailboxFolderId folderId : folderIds) {
available.put(folderId, createFiles(numFiles));
}
return available;
}
private List<MailboxFolderId> createFolderIds(int size) {
List<MailboxFolderId> folderIds = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
folderIds.add(new MailboxFolderId(getRandomId()));
}
return folderIds;
}
private Queue<MailboxFile> createFiles(int size) {
Queue<MailboxFile> files = new LinkedList<>();
for (int i = 0; i < size; i++) {
files.add(new MailboxFile(new MailboxFileId(getRandomId()), i));
}
return files;
}
private int countFilesWithFolderId(Queue<FolderFile> queue,
MailboxFolderId folderId) {
int count = 0;
for (FolderFile file : queue) {
if (file.folderId.equals(folderId)) count++;
}
return count;
}
}