From 9764aba47da240e6e6927f57f8d1476276f99818 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Fri, 15 Jul 2022 15:19:44 +0100 Subject: [PATCH] Add download worker for own mailbox. --- .../mailbox/OwnMailboxDownloadWorker.java | 341 ++++++++++++++++++ .../mailbox/OwnMailboxDownloadWorkerTest.java | 339 +++++++++++++++++ 2 files changed, 680 insertions(+) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/mailbox/OwnMailboxDownloadWorker.java create mode 100644 bramble-core/src/test/java/org/briarproject/bramble/mailbox/OwnMailboxDownloadWorkerTest.java diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/OwnMailboxDownloadWorker.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/OwnMailboxDownloadWorker.java new file mode 100644 index 000000000..4a9a660d0 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/OwnMailboxDownloadWorker.java @@ -0,0 +1,341 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.Cancellable; +import org.briarproject.bramble.api.mailbox.MailboxFileId; +import org.briarproject.bramble.api.mailbox.MailboxFolderId; +import org.briarproject.bramble.api.mailbox.MailboxProperties; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver; +import org.briarproject.bramble.mailbox.MailboxApi.ApiException; +import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile; +import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException; +import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.logging.Logger; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import static java.util.Collections.shuffle; +import static java.util.logging.Level.INFO; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.util.LogUtils.logException; + +@ThreadSafe +@NotNullByDefault +class OwnMailboxDownloadWorker implements MailboxWorker, ConnectivityObserver, + TorReachabilityObserver { + + /** + * When the worker is started it waits for a connectivity check, then + * starts its first download cycle: checking for folders with available + * files, listing the files in each folder, downloading and deleting the + * files, and checking again until all files have been downloaded and + * deleted. + *

+ * The worker then waits for our Tor hidden service to be reachable before + * starting its second download cycle. This ensures that if a contact + * tried and failed to connect to our hidden service before it was + * reachable, and therefore uploaded a file to the mailbox instead, we'll + * find the file in the second download cycle. + */ + private enum State { + CREATED, + CONNECTIVITY_CHECK, + DOWNLOAD_CYCLE_1, + WAITING_FOR_TOR, + DOWNLOAD_CYCLE_2, + FINISHED, + DESTROYED + } + + /** + * The maximum number of files that will be downloaded before checking + * again for folders with available files. This ensures that if a file + * arrives during a download cycle, its folder will be checked within a + * reasonable amount of time even if another folder has a very large number + * of files. + *

+ * Package access for testing. + */ + static final int MAX_ROUND_ROBIN_FILES = 1000; + + private static final Logger LOG = + getLogger(OwnMailboxDownloadWorker.class.getName()); + + private final ConnectivityChecker connectivityChecker; + private final TorReachabilityMonitor torReachabilityMonitor; + private final MailboxApiCaller mailboxApiCaller; + private final MailboxApi mailboxApi; + private final MailboxFileManager mailboxFileManager; + private final MailboxProperties mailboxProperties; + private final Object lock = new Object(); + + @GuardedBy("lock") + private State state = State.CREATED; + + @GuardedBy("lock") + @Nullable + private Cancellable apiCall = null; + + OwnMailboxDownloadWorker( + ConnectivityChecker connectivityChecker, + TorReachabilityMonitor torReachabilityMonitor, + MailboxApiCaller mailboxApiCaller, + MailboxApi mailboxApi, + MailboxFileManager mailboxFileManager, + MailboxProperties mailboxProperties) { + if (!mailboxProperties.isOwner()) throw new IllegalArgumentException(); + this.connectivityChecker = connectivityChecker; + this.torReachabilityMonitor = torReachabilityMonitor; + this.mailboxApiCaller = mailboxApiCaller; + this.mailboxApi = mailboxApi; + this.mailboxFileManager = mailboxFileManager; + this.mailboxProperties = mailboxProperties; + } + + @Override + public void start() { + LOG.info("Started"); + synchronized (lock) { + // Don't allow the worker to be reused + if (state != State.CREATED) return; + state = State.CONNECTIVITY_CHECK; + } + // Avoid leaking observer in case destroy() is called concurrently + // before observer is added + connectivityChecker.checkConnectivity(mailboxProperties, this); + boolean destroyed; + synchronized (lock) { + destroyed = state == State.DESTROYED; + } + if (destroyed) connectivityChecker.removeObserver(this); + } + + @Override + public void destroy() { + LOG.info("Destroyed"); + Cancellable apiCall; + synchronized (lock) { + state = State.DESTROYED; + apiCall = this.apiCall; + this.apiCall = null; + } + if (apiCall != null) apiCall.cancel(); + connectivityChecker.removeObserver(this); + torReachabilityMonitor.removeObserver(this); + } + + @Override + public void onConnectivityCheckSucceeded() { + LOG.info("Connectivity check succeeded"); + synchronized (lock) { + if (state != State.CONNECTIVITY_CHECK) return; + state = State.DOWNLOAD_CYCLE_1; + // Start first download cycle + apiCall = mailboxApiCaller.retryWithBackoff( + new SimpleApiCall(this::apiCallListFolders)); + } + } + + private void apiCallListFolders() throws IOException, ApiException { + synchronized (lock) { + if (state == State.DESTROYED) return; + } + LOG.info("Listing folders with available files"); + List folders = + mailboxApi.getFolders(mailboxProperties); + if (folders.isEmpty()) onDownloadCycleFinished(); + else listNextFolder(new LinkedList<>(folders), new HashMap<>()); + } + + private void onDownloadCycleFinished() { + boolean addObserver = false; + synchronized (lock) { + if (state == State.DOWNLOAD_CYCLE_1) { + LOG.info("First download cycle finished"); + state = State.WAITING_FOR_TOR; + apiCall = null; + addObserver = true; + } else if (state == State.DOWNLOAD_CYCLE_2) { + LOG.info("Second download cycle finished"); + state = State.FINISHED; + apiCall = null; + } + } + if (addObserver) { + // Avoid leaking observer in case destroy() is called concurrently + // before observer is added + torReachabilityMonitor.addOneShotObserver(this); + boolean destroyed; + synchronized (lock) { + destroyed = state == State.DESTROYED; + } + if (destroyed) torReachabilityMonitor.removeObserver(this); + } + } + + private void listNextFolder(Queue queue, + Map> available) { + synchronized (lock) { + if (state == State.DESTROYED) return; + MailboxFolderId folder = queue.remove(); + apiCall = mailboxApiCaller.retryWithBackoff(new SimpleApiCall(() -> + apiCallListFolder(folder, queue, available))); + } + } + + private void apiCallListFolder(MailboxFolderId folder, + Queue queue, + Map> available) + throws IOException, ApiException { + synchronized (lock) { + if (state == State.DESTROYED) return; + } + LOG.info("Listing folder"); + List files = + mailboxApi.getFiles(mailboxProperties, folder); + if (!files.isEmpty()) available.put(folder, new LinkedList<>(files)); + if (queue.isEmpty()) { + LOG.info("Finished listing folders"); + if (available.isEmpty()) onDownloadCycleFinished(); + else createDownloadQueue(available); + } else { + listNextFolder(queue, available); + } + } + + private void createDownloadQueue( + Map> available) { + synchronized (lock) { + if (state == State.DESTROYED) return; + } + if (LOG.isLoggable(INFO)) { + LOG.info(available.size() + " folders have available files"); + } + Queue queue = createRoundRobinQueue(available); + if (LOG.isLoggable(INFO)) { + LOG.info("Downloading " + queue.size() + " files"); + } + downloadNextFile(queue); + } + + // Package access for testing + Queue createRoundRobinQueue( + Map> available) { + List roundRobin = new ArrayList<>(available.keySet()); + // Shuffle the folders so we don't always favour the same folders + shuffle(roundRobin); + Queue queue = new LinkedList<>(); + while (queue.size() < MAX_ROUND_ROBIN_FILES && !available.isEmpty()) { + Iterator it = roundRobin.iterator(); + while (queue.size() < MAX_ROUND_ROBIN_FILES && it.hasNext()) { + MailboxFolderId folder = it.next(); + Queue files = available.get(folder); + MailboxFile file = files.remove(); + queue.add(new FolderFile(folder, file.name)); + if (files.isEmpty()) { + available.remove(folder); + it.remove(); + } + } + } + return queue; + } + + private void downloadNextFile(Queue queue) { + synchronized (lock) { + if (state == State.DESTROYED) return; + FolderFile file = queue.remove(); + apiCall = mailboxApiCaller.retryWithBackoff( + new SimpleApiCall(() -> apiCallDownloadFile(file, queue))); + } + } + + private void apiCallDownloadFile(FolderFile file, + Queue queue) throws IOException, ApiException { + synchronized (lock) { + if (state == State.DESTROYED) return; + } + LOG.info("Downloading file"); + File tempFile = mailboxFileManager.createTempFileForDownload(); + try { + mailboxApi.getFile(mailboxProperties, file.folderId, file.fileId, + tempFile); + } catch (IOException | ApiException e) { + if (!tempFile.delete()) { + LOG.warning("Failed to delete temporary file"); + } + throw e; + } + mailboxFileManager.handleDownloadedFile(tempFile); + deleteFile(file, queue); + } + + private void deleteFile(FolderFile file, Queue queue) { + synchronized (lock) { + if (state == State.DESTROYED) return; + apiCall = mailboxApiCaller.retryWithBackoff( + new SimpleApiCall(() -> apiCallDeleteFile(file, queue))); + } + } + + private void apiCallDeleteFile(FolderFile file, Queue queue) + throws IOException, ApiException { + synchronized (lock) { + if (state == State.DESTROYED) return; + } + try { + mailboxApi.deleteFile(mailboxProperties, file.folderId, + file.fileId); + } catch (TolerableFailureException e) { + // Catch this so we can continue to the next file + logException(LOG, INFO, e); + } + if (queue.isEmpty()) { + // List the folders with available files again to check for files + // that may have arrived while we were downloading + synchronized (lock) { + if (state == State.DESTROYED) return; + apiCall = mailboxApiCaller.retryWithBackoff( + new SimpleApiCall(this::apiCallListFolders)); + } + } else { + downloadNextFile(queue); + } + } + + @Override + public void onTorReachable() { + LOG.info("Our Tor hidden service is reachable"); + synchronized (lock) { + if (state != State.WAITING_FOR_TOR) return; + state = State.DOWNLOAD_CYCLE_2; + // Start second download cycle + apiCall = mailboxApiCaller.retryWithBackoff( + new SimpleApiCall(this::apiCallListFolders)); + } + } + + // Package access for testing + static class FolderFile { + + final MailboxFolderId folderId; + final MailboxFileId fileId; + + private FolderFile(MailboxFolderId folderId, MailboxFileId fileId) { + this.folderId = folderId; + this.fileId = fileId; + } + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/OwnMailboxDownloadWorkerTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/OwnMailboxDownloadWorkerTest.java new file mode 100644 index 000000000..472626584 --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/OwnMailboxDownloadWorkerTest.java @@ -0,0 +1,339 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.Cancellable; +import org.briarproject.bramble.api.mailbox.MailboxFileId; +import org.briarproject.bramble.api.mailbox.MailboxFolderId; +import org.briarproject.bramble.api.mailbox.MailboxProperties; +import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile; +import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException; +import org.briarproject.bramble.mailbox.OwnMailboxDownloadWorker.FolderFile; +import org.briarproject.bramble.test.BrambleMockTestCase; +import org.briarproject.bramble.test.CaptureArgumentAction; +import org.jmock.Expectations; +import org.jmock.lib.action.DoAllAction; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS; +import static org.briarproject.bramble.mailbox.OwnMailboxDownloadWorker.MAX_ROUND_ROBIN_FILES; +import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory; +import static org.briarproject.bramble.test.TestUtils.getMailboxProperties; +import static org.briarproject.bramble.test.TestUtils.getRandomId; +import static org.briarproject.bramble.test.TestUtils.getTestDirectory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class OwnMailboxDownloadWorkerTest extends BrambleMockTestCase { + + private final ConnectivityChecker connectivityChecker = + context.mock(ConnectivityChecker.class); + private final TorReachabilityMonitor torReachabilityMonitor = + context.mock(TorReachabilityMonitor.class); + private final MailboxApiCaller mailboxApiCaller = + context.mock(MailboxApiCaller.class); + private final MailboxApi mailboxApi = context.mock(MailboxApi.class); + private final MailboxFileManager mailboxFileManager = + context.mock(MailboxFileManager.class); + private final Cancellable apiCall = context.mock(Cancellable.class); + + private final MailboxProperties mailboxProperties = + getMailboxProperties(true, CLIENT_SUPPORTS); + private final long now = System.currentTimeMillis(); + private final MailboxFolderId folderId1 = + new MailboxFolderId(getRandomId()); + private final MailboxFolderId folderId2 = + new MailboxFolderId(getRandomId()); + private final List folderIds = + asList(folderId1, folderId2); + private final MailboxFile file1 = + new MailboxFile(new MailboxFileId(getRandomId()), now - 1); + private final MailboxFile file2 = + new MailboxFile(new MailboxFileId(getRandomId()), now); + private final List files = asList(file1, file2); + + private File testDir, tempFile; + private OwnMailboxDownloadWorker worker; + + @Before + public void setUp() { + testDir = getTestDirectory(); + tempFile = new File(testDir, "temp"); + worker = new OwnMailboxDownloadWorker(connectivityChecker, + torReachabilityMonitor, mailboxApiCaller, mailboxApi, + mailboxFileManager, mailboxProperties); + } + + @After + public void tearDown() { + deleteTestDirectory(testDir); + } + + @Test + public void testChecksConnectivityWhenStartedAndRemovesObserverWhenDestroyed() { + // When the worker is started it should start a connectivity check + expectStartConnectivityCheck(); + worker.start(); + + // When the worker is destroyed it should remove the connectivity + // and reachability observers + expectRemoveObservers(); + worker.destroy(); + } + + @Test + public void testChecksForFilesWhenConnectivityCheckSucceeds() + throws Exception { + // When the worker is started it should start a connectivity check + expectStartConnectivityCheck(); + worker.start(); + + // When the connectivity check succeeds, a list-folders task should be + // started for the first download cycle + AtomicReference listFoldersTask = new AtomicReference<>(); + expectStartTask(listFoldersTask); + worker.onConnectivityCheckSucceeded(); + + // When the list-folders tasks runs and finds no folders with files + // to download, it should add a Tor reachability observer + expectCheckForFoldersWithAvailableFiles(emptyList()); + expectAddReachabilityObserver(); + assertFalse(listFoldersTask.get().callApi()); + + // When the reachability observer is called, a list-folders task should + // be started for the second download cycle + expectStartTask(listFoldersTask); + worker.onTorReachable(); + + // When the list-folders tasks runs and finds no folders with files + // to download, it should finish the second download cycle + expectCheckForFoldersWithAvailableFiles(emptyList()); + assertFalse(listFoldersTask.get().callApi()); + + // When the worker is destroyed it should remove the connectivity + // and reachability observers + expectRemoveObservers(); + worker.destroy(); + } + + @Test + public void testDownloadsFilesWhenConnectivityCheckSucceeds() + throws Exception { + // When the worker is started it should start a connectivity check + expectStartConnectivityCheck(); + worker.start(); + + // When the connectivity check succeeds, a list-folders task should be + // started for the first download cycle + AtomicReference listFoldersTask = new AtomicReference<>(); + expectStartTask(listFoldersTask); + worker.onConnectivityCheckSucceeded(); + + // When the list-folders tasks runs and finds some folders with files + // to download, it should start a list-files task for the first folder + AtomicReference listFilesTask = new AtomicReference<>(); + expectCheckForFoldersWithAvailableFiles(folderIds); + expectStartTask(listFilesTask); + assertFalse(listFoldersTask.get().callApi()); + + // When the first list-files task runs and finds no files to download, + // it should start a second list-files task for the next folder + expectCheckForFiles(folderId1, emptyList()); + expectStartTask(listFilesTask); + assertFalse(listFilesTask.get().callApi()); + + // When the second list-files task runs and finds some files to + // download, it should create the round-robin queue and start a + // download task for the first file + AtomicReference downloadTask = new AtomicReference<>(); + expectCheckForFiles(folderId2, files); + expectStartTask(downloadTask); + assertFalse(listFilesTask.get().callApi()); + + // When the first download task runs it should download the file to the + // location provided by the file manager and start a delete task + AtomicReference deleteTask = new AtomicReference<>(); + expectDownloadFile(folderId2, file1); + expectStartTask(deleteTask); + assertFalse(downloadTask.get().callApi()); + + // When the first delete task runs it should delete the file, ignore + // the tolerable failure, and start a download task for the next file + expectDeleteFile(folderId2, file1, true); // Delete fails tolerably + expectStartTask(downloadTask); + assertFalse(deleteTask.get().callApi()); + + // When the second download task runs it should download the file to + // the location provided by the file manager and start a delete task + expectDownloadFile(folderId2, file2); + expectStartTask(deleteTask); + assertFalse(downloadTask.get().callApi()); + + // When the second delete task runs it should delete the file and + // start a list-inbox task to check for files that may have arrived + // since the first download cycle started + expectDeleteFile(folderId2, file2, false); // Delete succeeds + expectStartTask(listFoldersTask); + assertFalse(deleteTask.get().callApi()); + + // When the list-inbox tasks runs and finds no more files to download, + // it should add a Tor reachability observer + expectCheckForFoldersWithAvailableFiles(emptyList()); + expectAddReachabilityObserver(); + assertFalse(listFoldersTask.get().callApi()); + + // When the reachability observer is called, a list-inbox task should + // be started for the second download cycle + expectStartTask(listFoldersTask); + worker.onTorReachable(); + + // When the list-inbox tasks runs and finds no more files to download, + // it should finish the second download cycle + expectCheckForFoldersWithAvailableFiles(emptyList()); + assertFalse(listFoldersTask.get().callApi()); + + // When the worker is destroyed it should remove the connectivity + // and reachability observers + expectRemoveObservers(); + worker.destroy(); + } + + @Test + public void testRoundRobinQueueVisitsAllFolders() { + // Ten folders with two files each + Map> available = + createAvailableFiles(10, 2); + Queue queue = worker.createRoundRobinQueue(available); + // Check that all files were queued + for (MailboxFolderId folderId : available.keySet()) { + assertEquals(2, countFilesWithFolderId(queue, folderId)); + } + } + + @Test + public void testSizeOfRoundRobinQueueIsLimited() { + // Two folders with MAX_ROUND_ROBIN_FILES each + Map> available = + createAvailableFiles(2, MAX_ROUND_ROBIN_FILES); + Queue queue = worker.createRoundRobinQueue(available); + // Check that half the files in each folder were queued + for (MailboxFolderId folderId : available.keySet()) { + assertEquals(MAX_ROUND_ROBIN_FILES / 2, + countFilesWithFolderId(queue, folderId)); + } + } + + private void expectStartConnectivityCheck() { + context.checking(new Expectations() {{ + oneOf(connectivityChecker).checkConnectivity( + with(mailboxProperties), with(worker)); + }}); + } + + private void expectStartTask(AtomicReference task) { + context.checking(new Expectations() {{ + oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); + will(new DoAllAction( + new CaptureArgumentAction<>(task, ApiCall.class, 0), + returnValue(apiCall) + )); + }}); + } + + private void expectCheckForFoldersWithAvailableFiles( + List folderIds) throws Exception { + context.checking(new Expectations() {{ + oneOf(mailboxApi).getFolders(mailboxProperties); + will(returnValue(folderIds)); + }}); + } + + private void expectCheckForFiles(MailboxFolderId folderId, + List files) throws Exception { + context.checking(new Expectations() {{ + oneOf(mailboxApi).getFiles(mailboxProperties, folderId); + will(returnValue(files)); + }}); + } + + private void expectDownloadFile(MailboxFolderId folderId, MailboxFile file) + throws Exception { + context.checking(new Expectations() {{ + oneOf(mailboxFileManager).createTempFileForDownload(); + will(returnValue(tempFile)); + oneOf(mailboxApi).getFile(mailboxProperties, folderId, file.name, + tempFile); + oneOf(mailboxFileManager).handleDownloadedFile(tempFile); + }}); + } + + private void expectDeleteFile(MailboxFolderId folderId, MailboxFile file, + boolean tolerableFailure) throws Exception { + context.checking(new Expectations() {{ + oneOf(mailboxApi).deleteFile(mailboxProperties, folderId, + file.name); + if (tolerableFailure) { + will(throwException(new TolerableFailureException())); + } + }}); + } + + private void expectAddReachabilityObserver() { + context.checking(new Expectations() {{ + oneOf(torReachabilityMonitor).addOneShotObserver(worker); + }}); + } + + private void expectRemoveObservers() { + context.checking(new Expectations() {{ + oneOf(connectivityChecker).removeObserver(worker); + oneOf(torReachabilityMonitor).removeObserver(worker); + }}); + } + + private Map> createAvailableFiles( + int numFolders, int numFiles) { + Map> available = new HashMap<>(); + List folderIds = createFolderIds(numFolders); + for (MailboxFolderId folderId : folderIds) { + available.put(folderId, createFiles(numFiles)); + } + return available; + } + + private List createFolderIds(int size) { + List folderIds = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + folderIds.add(new MailboxFolderId(getRandomId())); + } + return folderIds; + } + + private Queue createFiles(int size) { + Queue files = new LinkedList<>(); + for (int i = 0; i < size; i++) { + files.add(new MailboxFile(new MailboxFileId(getRandomId()), i)); + } + return files; + } + + private int countFilesWithFolderId(Queue queue, + MailboxFolderId folderId) { + int count = 0; + for (FolderFile file : queue) { + if (file.folderId.equals(folderId)) count++; + } + return count; + } +}