mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-21 07:09:56 +01:00
Refactor duplicated code into superclass.
This commit is contained in:
@@ -1,73 +1,23 @@
|
|||||||
package org.briarproject.bramble.mailbox;
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
import org.briarproject.bramble.api.Cancellable;
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver;
|
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
|
||||||
import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.logging.Logger;
|
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import javax.annotation.concurrent.GuardedBy;
|
|
||||||
import javax.annotation.concurrent.ThreadSafe;
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
import static java.util.logging.Level.INFO;
|
|
||||||
import static java.util.logging.Logger.getLogger;
|
|
||||||
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
||||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
|
||||||
|
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
@NotNullByDefault
|
@NotNullByDefault
|
||||||
class ContactMailboxDownloadWorker implements MailboxWorker,
|
class ContactMailboxDownloadWorker extends MailboxDownloadWorker {
|
||||||
ConnectivityObserver, TorReachabilityObserver {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* When the worker is started it waits for a connectivity check, then
|
|
||||||
* starts its first download cycle: checking the inbox, downloading and
|
|
||||||
* deleting any files, and checking again until the inbox is empty.
|
|
||||||
* <p>
|
|
||||||
* The worker then waits for our Tor hidden service to be reachable before
|
|
||||||
* starting its second download cycle. This ensures that if a contact
|
|
||||||
* tried and failed to connect to our hidden service before it was
|
|
||||||
* reachable, and therefore uploaded a file to the mailbox instead, we'll
|
|
||||||
* find the file in the second download cycle.
|
|
||||||
*/
|
|
||||||
private enum State {
|
|
||||||
CREATED,
|
|
||||||
CONNECTIVITY_CHECK,
|
|
||||||
DOWNLOAD_CYCLE_1,
|
|
||||||
WAITING_FOR_TOR,
|
|
||||||
DOWNLOAD_CYCLE_2,
|
|
||||||
FINISHED,
|
|
||||||
DESTROYED
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
|
||||||
getLogger(ContactMailboxDownloadWorker.class.getName());
|
|
||||||
|
|
||||||
private final ConnectivityChecker connectivityChecker;
|
|
||||||
private final TorReachabilityMonitor torReachabilityMonitor;
|
|
||||||
private final MailboxApiCaller mailboxApiCaller;
|
|
||||||
private final MailboxApi mailboxApi;
|
|
||||||
private final MailboxFileManager mailboxFileManager;
|
|
||||||
private final MailboxProperties mailboxProperties;
|
|
||||||
private final Object lock = new Object();
|
|
||||||
|
|
||||||
@GuardedBy("lock")
|
|
||||||
private State state = State.CREATED;
|
|
||||||
|
|
||||||
@GuardedBy("lock")
|
|
||||||
@Nullable
|
|
||||||
private Cancellable apiCall = null;
|
|
||||||
|
|
||||||
ContactMailboxDownloadWorker(
|
ContactMailboxDownloadWorker(
|
||||||
ConnectivityChecker connectivityChecker,
|
ConnectivityChecker connectivityChecker,
|
||||||
@@ -76,57 +26,14 @@ class ContactMailboxDownloadWorker implements MailboxWorker,
|
|||||||
MailboxApi mailboxApi,
|
MailboxApi mailboxApi,
|
||||||
MailboxFileManager mailboxFileManager,
|
MailboxFileManager mailboxFileManager,
|
||||||
MailboxProperties mailboxProperties) {
|
MailboxProperties mailboxProperties) {
|
||||||
|
super(connectivityChecker, torReachabilityMonitor, mailboxApiCaller,
|
||||||
|
mailboxApi, mailboxFileManager, mailboxProperties);
|
||||||
if (mailboxProperties.isOwner()) throw new IllegalArgumentException();
|
if (mailboxProperties.isOwner()) throw new IllegalArgumentException();
|
||||||
this.connectivityChecker = connectivityChecker;
|
|
||||||
this.torReachabilityMonitor = torReachabilityMonitor;
|
|
||||||
this.mailboxApiCaller = mailboxApiCaller;
|
|
||||||
this.mailboxApi = mailboxApi;
|
|
||||||
this.mailboxFileManager = mailboxFileManager;
|
|
||||||
this.mailboxProperties = mailboxProperties;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
protected ApiCall createApiCallForDownloadCycle() {
|
||||||
LOG.info("Started");
|
return new SimpleApiCall(this::apiCallListInbox);
|
||||||
synchronized (lock) {
|
|
||||||
// Don't allow the worker to be reused
|
|
||||||
if (state != State.CREATED) return;
|
|
||||||
state = State.CONNECTIVITY_CHECK;
|
|
||||||
}
|
|
||||||
// Avoid leaking observer in case destroy() is called concurrently
|
|
||||||
// before observer is added
|
|
||||||
connectivityChecker.checkConnectivity(mailboxProperties, this);
|
|
||||||
boolean destroyed;
|
|
||||||
synchronized (lock) {
|
|
||||||
destroyed = state == State.DESTROYED;
|
|
||||||
}
|
|
||||||
if (destroyed) connectivityChecker.removeObserver(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void destroy() {
|
|
||||||
LOG.info("Destroyed");
|
|
||||||
Cancellable apiCall;
|
|
||||||
synchronized (lock) {
|
|
||||||
state = State.DESTROYED;
|
|
||||||
apiCall = this.apiCall;
|
|
||||||
this.apiCall = null;
|
|
||||||
}
|
|
||||||
if (apiCall != null) apiCall.cancel();
|
|
||||||
connectivityChecker.removeObserver(this);
|
|
||||||
torReachabilityMonitor.removeObserver(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onConnectivityCheckSucceeded() {
|
|
||||||
LOG.info("Connectivity check succeeded");
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state != State.CONNECTIVITY_CHECK) return;
|
|
||||||
state = State.DOWNLOAD_CYCLE_1;
|
|
||||||
// Start first download cycle
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(this::apiCallListInbox));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void apiCallListInbox() throws IOException, ApiException {
|
private void apiCallListInbox() throws IOException, ApiException {
|
||||||
@@ -134,110 +41,18 @@ class ContactMailboxDownloadWorker implements MailboxWorker,
|
|||||||
if (state == State.DESTROYED) return;
|
if (state == State.DESTROYED) return;
|
||||||
}
|
}
|
||||||
LOG.info("Listing inbox");
|
LOG.info("Listing inbox");
|
||||||
List<MailboxFile> files = mailboxApi.getFiles(mailboxProperties,
|
MailboxFolderId folderId =
|
||||||
requireNonNull(mailboxProperties.getInboxId()));
|
requireNonNull(mailboxProperties.getInboxId());
|
||||||
if (files.isEmpty()) onDownloadCycleFinished();
|
List<MailboxFile> files =
|
||||||
else downloadNextFile(new LinkedList<>(files));
|
mailboxApi.getFiles(mailboxProperties, folderId);
|
||||||
}
|
if (files.isEmpty()) {
|
||||||
|
onDownloadCycleFinished();
|
||||||
private void onDownloadCycleFinished() {
|
|
||||||
boolean addObserver = false;
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DOWNLOAD_CYCLE_1) {
|
|
||||||
LOG.info("First download cycle finished");
|
|
||||||
state = State.WAITING_FOR_TOR;
|
|
||||||
apiCall = null;
|
|
||||||
addObserver = true;
|
|
||||||
} else if (state == State.DOWNLOAD_CYCLE_2) {
|
|
||||||
LOG.info("Second download cycle finished");
|
|
||||||
state = State.FINISHED;
|
|
||||||
apiCall = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (addObserver) {
|
|
||||||
// Avoid leaking observer in case destroy() is called concurrently
|
|
||||||
// before observer is added
|
|
||||||
torReachabilityMonitor.addOneShotObserver(this);
|
|
||||||
boolean destroyed;
|
|
||||||
synchronized (lock) {
|
|
||||||
destroyed = state == State.DESTROYED;
|
|
||||||
}
|
|
||||||
if (destroyed) torReachabilityMonitor.removeObserver(this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void downloadNextFile(Queue<MailboxFile> queue) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
MailboxFile file = queue.remove();
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(() -> apiCallDownloadFile(file, queue)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void apiCallDownloadFile(MailboxFile file,
|
|
||||||
Queue<MailboxFile> queue) throws IOException, ApiException {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
}
|
|
||||||
LOG.info("Downloading file");
|
|
||||||
File tempFile = mailboxFileManager.createTempFileForDownload();
|
|
||||||
try {
|
|
||||||
mailboxApi.getFile(mailboxProperties,
|
|
||||||
requireNonNull(mailboxProperties.getInboxId()),
|
|
||||||
file.name, tempFile);
|
|
||||||
} catch (IOException | ApiException e) {
|
|
||||||
if (!tempFile.delete()) {
|
|
||||||
LOG.warning("Failed to delete temporary file");
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
mailboxFileManager.handleDownloadedFile(tempFile);
|
|
||||||
deleteFile(file, queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void deleteFile(MailboxFile file, Queue<MailboxFile> queue) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(() -> apiCallDeleteFile(file, queue)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void apiCallDeleteFile(MailboxFile file, Queue<MailboxFile> queue)
|
|
||||||
throws IOException, ApiException {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
mailboxApi.deleteFile(mailboxProperties,
|
|
||||||
requireNonNull(mailboxProperties.getInboxId()), file.name);
|
|
||||||
} catch (TolerableFailureException e) {
|
|
||||||
// Catch this so we can continue to the next file
|
|
||||||
logException(LOG, INFO, e);
|
|
||||||
}
|
|
||||||
if (queue.isEmpty()) {
|
|
||||||
// List the inbox again to check for files that may have arrived
|
|
||||||
// while we were downloading
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(this::apiCallListInbox));
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
|
Queue<FolderFile> queue = new LinkedList<>();
|
||||||
|
for (MailboxFile file : files) {
|
||||||
|
queue.add(new FolderFile(folderId, file.name));
|
||||||
|
}
|
||||||
downloadNextFile(queue);
|
downloadNextFile(queue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onTorReachable() {
|
|
||||||
LOG.info("Our Tor hidden service is reachable");
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state != State.WAITING_FOR_TOR) return;
|
|
||||||
state = State.DOWNLOAD_CYCLE_2;
|
|
||||||
// Start second download cycle
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(this::apiCallListInbox));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,246 @@
|
|||||||
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.Cancellable;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFileId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
|
import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
||||||
|
import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import javax.annotation.concurrent.GuardedBy;
|
||||||
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
|
import static java.util.logging.Level.INFO;
|
||||||
|
import static java.util.logging.Logger.getLogger;
|
||||||
|
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||||
|
|
||||||
|
@ThreadSafe
|
||||||
|
@NotNullByDefault
|
||||||
|
abstract class MailboxDownloadWorker implements MailboxWorker,
|
||||||
|
ConnectivityObserver, TorReachabilityObserver {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When the worker is started it waits for a connectivity check, then
|
||||||
|
* starts its first download cycle: checking for files to download,
|
||||||
|
* downloading and deleting the files, and checking again until all files
|
||||||
|
* have been downloaded and deleted.
|
||||||
|
* <p>
|
||||||
|
* The worker then waits for our Tor hidden service to be reachable before
|
||||||
|
* starting its second download cycle. This ensures that if a contact
|
||||||
|
* tried and failed to connect to our hidden service before it was
|
||||||
|
* reachable, and therefore uploaded a file to the mailbox instead, we'll
|
||||||
|
* find the file in the second download cycle.
|
||||||
|
*/
|
||||||
|
protected enum State {
|
||||||
|
CREATED,
|
||||||
|
CONNECTIVITY_CHECK,
|
||||||
|
DOWNLOAD_CYCLE_1,
|
||||||
|
WAITING_FOR_TOR,
|
||||||
|
DOWNLOAD_CYCLE_2,
|
||||||
|
FINISHED,
|
||||||
|
DESTROYED
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static final Logger LOG =
|
||||||
|
getLogger(MailboxDownloadWorker.class.getName());
|
||||||
|
|
||||||
|
private final ConnectivityChecker connectivityChecker;
|
||||||
|
private final TorReachabilityMonitor torReachabilityMonitor;
|
||||||
|
protected final MailboxApiCaller mailboxApiCaller;
|
||||||
|
protected final MailboxApi mailboxApi;
|
||||||
|
private final MailboxFileManager mailboxFileManager;
|
||||||
|
protected final MailboxProperties mailboxProperties;
|
||||||
|
protected final Object lock = new Object();
|
||||||
|
|
||||||
|
@GuardedBy("lock")
|
||||||
|
protected State state = State.CREATED;
|
||||||
|
|
||||||
|
@GuardedBy("lock")
|
||||||
|
@Nullable
|
||||||
|
protected Cancellable apiCall = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the API call that starts the worker's download cycle.
|
||||||
|
*/
|
||||||
|
protected abstract ApiCall createApiCallForDownloadCycle();
|
||||||
|
|
||||||
|
MailboxDownloadWorker(
|
||||||
|
ConnectivityChecker connectivityChecker,
|
||||||
|
TorReachabilityMonitor torReachabilityMonitor,
|
||||||
|
MailboxApiCaller mailboxApiCaller,
|
||||||
|
MailboxApi mailboxApi,
|
||||||
|
MailboxFileManager mailboxFileManager,
|
||||||
|
MailboxProperties mailboxProperties) {
|
||||||
|
this.connectivityChecker = connectivityChecker;
|
||||||
|
this.torReachabilityMonitor = torReachabilityMonitor;
|
||||||
|
this.mailboxApiCaller = mailboxApiCaller;
|
||||||
|
this.mailboxApi = mailboxApi;
|
||||||
|
this.mailboxFileManager = mailboxFileManager;
|
||||||
|
this.mailboxProperties = mailboxProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
LOG.info("Started");
|
||||||
|
synchronized (lock) {
|
||||||
|
// Don't allow the worker to be reused
|
||||||
|
if (state != State.CREATED) return;
|
||||||
|
state = State.CONNECTIVITY_CHECK;
|
||||||
|
}
|
||||||
|
// Avoid leaking observer in case destroy() is called concurrently
|
||||||
|
// before observer is added
|
||||||
|
connectivityChecker.checkConnectivity(mailboxProperties, this);
|
||||||
|
boolean destroyed;
|
||||||
|
synchronized (lock) {
|
||||||
|
destroyed = state == State.DESTROYED;
|
||||||
|
}
|
||||||
|
if (destroyed) connectivityChecker.removeObserver(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
LOG.info("Destroyed");
|
||||||
|
Cancellable apiCall;
|
||||||
|
synchronized (lock) {
|
||||||
|
state = State.DESTROYED;
|
||||||
|
apiCall = this.apiCall;
|
||||||
|
this.apiCall = null;
|
||||||
|
}
|
||||||
|
if (apiCall != null) apiCall.cancel();
|
||||||
|
connectivityChecker.removeObserver(this);
|
||||||
|
torReachabilityMonitor.removeObserver(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConnectivityCheckSucceeded() {
|
||||||
|
LOG.info("Connectivity check succeeded");
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.CONNECTIVITY_CHECK) return;
|
||||||
|
state = State.DOWNLOAD_CYCLE_1;
|
||||||
|
// Start first download cycle
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
createApiCallForDownloadCycle());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void onDownloadCycleFinished() {
|
||||||
|
boolean addObserver = false;
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DOWNLOAD_CYCLE_1) {
|
||||||
|
LOG.info("First download cycle finished");
|
||||||
|
state = State.WAITING_FOR_TOR;
|
||||||
|
apiCall = null;
|
||||||
|
addObserver = true;
|
||||||
|
} else if (state == State.DOWNLOAD_CYCLE_2) {
|
||||||
|
LOG.info("Second download cycle finished");
|
||||||
|
state = State.FINISHED;
|
||||||
|
apiCall = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (addObserver) {
|
||||||
|
// Avoid leaking observer in case destroy() is called concurrently
|
||||||
|
// before observer is added
|
||||||
|
torReachabilityMonitor.addOneShotObserver(this);
|
||||||
|
boolean destroyed;
|
||||||
|
synchronized (lock) {
|
||||||
|
destroyed = state == State.DESTROYED;
|
||||||
|
}
|
||||||
|
if (destroyed) torReachabilityMonitor.removeObserver(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void downloadNextFile(Queue<FolderFile> queue) {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
FolderFile file = queue.remove();
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
new SimpleApiCall(() -> apiCallDownloadFile(file, queue)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void apiCallDownloadFile(FolderFile file, Queue<FolderFile> queue)
|
||||||
|
throws IOException, ApiException {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
}
|
||||||
|
LOG.info("Downloading file");
|
||||||
|
File tempFile = mailboxFileManager.createTempFileForDownload();
|
||||||
|
try {
|
||||||
|
mailboxApi.getFile(mailboxProperties, file.folderId, file.fileId,
|
||||||
|
tempFile);
|
||||||
|
} catch (IOException | ApiException e) {
|
||||||
|
if (!tempFile.delete()) {
|
||||||
|
LOG.warning("Failed to delete temporary file");
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
mailboxFileManager.handleDownloadedFile(tempFile);
|
||||||
|
deleteFile(file, queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteFile(FolderFile file, Queue<FolderFile> queue) {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
new SimpleApiCall(() -> apiCallDeleteFile(file, queue)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void apiCallDeleteFile(FolderFile file, Queue<FolderFile> queue)
|
||||||
|
throws IOException, ApiException {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
mailboxApi.deleteFile(mailboxProperties, file.folderId,
|
||||||
|
file.fileId);
|
||||||
|
} catch (TolerableFailureException e) {
|
||||||
|
// Catch this so we can continue to the next file
|
||||||
|
logException(LOG, INFO, e);
|
||||||
|
}
|
||||||
|
if (queue.isEmpty()) {
|
||||||
|
// Check for files again, as new files may have arrived while we
|
||||||
|
// were downloading
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.DESTROYED) return;
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
createApiCallForDownloadCycle());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
downloadNextFile(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTorReachable() {
|
||||||
|
LOG.info("Our Tor hidden service is reachable");
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.WAITING_FOR_TOR) return;
|
||||||
|
state = State.DOWNLOAD_CYCLE_2;
|
||||||
|
// Start second download cycle
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
createApiCallForDownloadCycle());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Package access for testing
|
||||||
|
static class FolderFile {
|
||||||
|
|
||||||
|
final MailboxFolderId folderId;
|
||||||
|
final MailboxFileId fileId;
|
||||||
|
|
||||||
|
FolderFile(MailboxFolderId folderId, MailboxFileId fileId) {
|
||||||
|
this.folderId = folderId;
|
||||||
|
this.fileId = fileId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,17 +1,11 @@
|
|||||||
package org.briarproject.bramble.mailbox;
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
import org.briarproject.bramble.api.Cancellable;
|
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxFileId;
|
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver;
|
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
|
||||||
import org.briarproject.bramble.mailbox.TorReachabilityMonitor.TorReachabilityObserver;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@@ -20,74 +14,27 @@ import java.util.LinkedList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.logging.Logger;
|
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import javax.annotation.concurrent.GuardedBy;
|
|
||||||
import javax.annotation.concurrent.ThreadSafe;
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
import static java.util.Collections.shuffle;
|
import static java.util.Collections.shuffle;
|
||||||
import static java.util.logging.Level.INFO;
|
import static java.util.logging.Level.INFO;
|
||||||
import static java.util.logging.Logger.getLogger;
|
|
||||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
|
||||||
|
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
@NotNullByDefault
|
@NotNullByDefault
|
||||||
class OwnMailboxDownloadWorker implements MailboxWorker, ConnectivityObserver,
|
class OwnMailboxDownloadWorker extends MailboxDownloadWorker {
|
||||||
TorReachabilityObserver {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* When the worker is started it waits for a connectivity check, then
|
|
||||||
* starts its first download cycle: checking for folders with available
|
|
||||||
* files, listing the files in each folder, downloading and deleting the
|
|
||||||
* files, and checking again until all files have been downloaded and
|
|
||||||
* deleted.
|
|
||||||
* <p>
|
|
||||||
* The worker then waits for our Tor hidden service to be reachable before
|
|
||||||
* starting its second download cycle. This ensures that if a contact
|
|
||||||
* tried and failed to connect to our hidden service before it was
|
|
||||||
* reachable, and therefore uploaded a file to the mailbox instead, we'll
|
|
||||||
* find the file in the second download cycle.
|
|
||||||
*/
|
|
||||||
private enum State {
|
|
||||||
CREATED,
|
|
||||||
CONNECTIVITY_CHECK,
|
|
||||||
DOWNLOAD_CYCLE_1,
|
|
||||||
WAITING_FOR_TOR,
|
|
||||||
DOWNLOAD_CYCLE_2,
|
|
||||||
FINISHED,
|
|
||||||
DESTROYED
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of files that will be downloaded before checking
|
* The maximum number of files that will be downloaded before checking
|
||||||
* again for folders with available files. This ensures that if a file
|
* again for folders with available files. This ensures that if a file
|
||||||
* arrives during a download cycle, its folder will be checked within a
|
* arrives during a download cycle, its folder will be checked within a
|
||||||
* reasonable amount of time even if another folder has a very large number
|
* reasonable amount of time even if some other folder has a very large
|
||||||
* of files.
|
* number of files.
|
||||||
* <p>
|
* <p>
|
||||||
* Package access for testing.
|
* Package access for testing.
|
||||||
*/
|
*/
|
||||||
static final int MAX_ROUND_ROBIN_FILES = 1000;
|
static final int MAX_ROUND_ROBIN_FILES = 1000;
|
||||||
|
|
||||||
private static final Logger LOG =
|
|
||||||
getLogger(OwnMailboxDownloadWorker.class.getName());
|
|
||||||
|
|
||||||
private final ConnectivityChecker connectivityChecker;
|
|
||||||
private final TorReachabilityMonitor torReachabilityMonitor;
|
|
||||||
private final MailboxApiCaller mailboxApiCaller;
|
|
||||||
private final MailboxApi mailboxApi;
|
|
||||||
private final MailboxFileManager mailboxFileManager;
|
|
||||||
private final MailboxProperties mailboxProperties;
|
|
||||||
private final Object lock = new Object();
|
|
||||||
|
|
||||||
@GuardedBy("lock")
|
|
||||||
private State state = State.CREATED;
|
|
||||||
|
|
||||||
@GuardedBy("lock")
|
|
||||||
@Nullable
|
|
||||||
private Cancellable apiCall = null;
|
|
||||||
|
|
||||||
OwnMailboxDownloadWorker(
|
OwnMailboxDownloadWorker(
|
||||||
ConnectivityChecker connectivityChecker,
|
ConnectivityChecker connectivityChecker,
|
||||||
TorReachabilityMonitor torReachabilityMonitor,
|
TorReachabilityMonitor torReachabilityMonitor,
|
||||||
@@ -95,57 +42,14 @@ class OwnMailboxDownloadWorker implements MailboxWorker, ConnectivityObserver,
|
|||||||
MailboxApi mailboxApi,
|
MailboxApi mailboxApi,
|
||||||
MailboxFileManager mailboxFileManager,
|
MailboxFileManager mailboxFileManager,
|
||||||
MailboxProperties mailboxProperties) {
|
MailboxProperties mailboxProperties) {
|
||||||
|
super(connectivityChecker, torReachabilityMonitor, mailboxApiCaller,
|
||||||
|
mailboxApi, mailboxFileManager, mailboxProperties);
|
||||||
if (!mailboxProperties.isOwner()) throw new IllegalArgumentException();
|
if (!mailboxProperties.isOwner()) throw new IllegalArgumentException();
|
||||||
this.connectivityChecker = connectivityChecker;
|
|
||||||
this.torReachabilityMonitor = torReachabilityMonitor;
|
|
||||||
this.mailboxApiCaller = mailboxApiCaller;
|
|
||||||
this.mailboxApi = mailboxApi;
|
|
||||||
this.mailboxFileManager = mailboxFileManager;
|
|
||||||
this.mailboxProperties = mailboxProperties;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
protected ApiCall createApiCallForDownloadCycle() {
|
||||||
LOG.info("Started");
|
return new SimpleApiCall(this::apiCallListFolders);
|
||||||
synchronized (lock) {
|
|
||||||
// Don't allow the worker to be reused
|
|
||||||
if (state != State.CREATED) return;
|
|
||||||
state = State.CONNECTIVITY_CHECK;
|
|
||||||
}
|
|
||||||
// Avoid leaking observer in case destroy() is called concurrently
|
|
||||||
// before observer is added
|
|
||||||
connectivityChecker.checkConnectivity(mailboxProperties, this);
|
|
||||||
boolean destroyed;
|
|
||||||
synchronized (lock) {
|
|
||||||
destroyed = state == State.DESTROYED;
|
|
||||||
}
|
|
||||||
if (destroyed) connectivityChecker.removeObserver(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void destroy() {
|
|
||||||
LOG.info("Destroyed");
|
|
||||||
Cancellable apiCall;
|
|
||||||
synchronized (lock) {
|
|
||||||
state = State.DESTROYED;
|
|
||||||
apiCall = this.apiCall;
|
|
||||||
this.apiCall = null;
|
|
||||||
}
|
|
||||||
if (apiCall != null) apiCall.cancel();
|
|
||||||
connectivityChecker.removeObserver(this);
|
|
||||||
torReachabilityMonitor.removeObserver(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onConnectivityCheckSucceeded() {
|
|
||||||
LOG.info("Connectivity check succeeded");
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state != State.CONNECTIVITY_CHECK) return;
|
|
||||||
state = State.DOWNLOAD_CYCLE_1;
|
|
||||||
// Start first download cycle
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(this::apiCallListFolders));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void apiCallListFolders() throws IOException, ApiException {
|
private void apiCallListFolders() throws IOException, ApiException {
|
||||||
@@ -159,32 +63,6 @@ class OwnMailboxDownloadWorker implements MailboxWorker, ConnectivityObserver,
|
|||||||
else listNextFolder(new LinkedList<>(folders), new HashMap<>());
|
else listNextFolder(new LinkedList<>(folders), new HashMap<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onDownloadCycleFinished() {
|
|
||||||
boolean addObserver = false;
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DOWNLOAD_CYCLE_1) {
|
|
||||||
LOG.info("First download cycle finished");
|
|
||||||
state = State.WAITING_FOR_TOR;
|
|
||||||
apiCall = null;
|
|
||||||
addObserver = true;
|
|
||||||
} else if (state == State.DOWNLOAD_CYCLE_2) {
|
|
||||||
LOG.info("Second download cycle finished");
|
|
||||||
state = State.FINISHED;
|
|
||||||
apiCall = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (addObserver) {
|
|
||||||
// Avoid leaking observer in case destroy() is called concurrently
|
|
||||||
// before observer is added
|
|
||||||
torReachabilityMonitor.addOneShotObserver(this);
|
|
||||||
boolean destroyed;
|
|
||||||
synchronized (lock) {
|
|
||||||
destroyed = state == State.DESTROYED;
|
|
||||||
}
|
|
||||||
if (destroyed) torReachabilityMonitor.removeObserver(this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void listNextFolder(Queue<MailboxFolderId> queue,
|
private void listNextFolder(Queue<MailboxFolderId> queue,
|
||||||
Map<MailboxFolderId, Queue<MailboxFile>> available) {
|
Map<MailboxFolderId, Queue<MailboxFile>> available) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
@@ -252,90 +130,4 @@ class OwnMailboxDownloadWorker implements MailboxWorker, ConnectivityObserver,
|
|||||||
}
|
}
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void downloadNextFile(Queue<FolderFile> queue) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
FolderFile file = queue.remove();
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(() -> apiCallDownloadFile(file, queue)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void apiCallDownloadFile(FolderFile file,
|
|
||||||
Queue<FolderFile> queue) throws IOException, ApiException {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
}
|
|
||||||
LOG.info("Downloading file");
|
|
||||||
File tempFile = mailboxFileManager.createTempFileForDownload();
|
|
||||||
try {
|
|
||||||
mailboxApi.getFile(mailboxProperties, file.folderId, file.fileId,
|
|
||||||
tempFile);
|
|
||||||
} catch (IOException | ApiException e) {
|
|
||||||
if (!tempFile.delete()) {
|
|
||||||
LOG.warning("Failed to delete temporary file");
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
mailboxFileManager.handleDownloadedFile(tempFile);
|
|
||||||
deleteFile(file, queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void deleteFile(FolderFile file, Queue<FolderFile> queue) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(() -> apiCallDeleteFile(file, queue)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void apiCallDeleteFile(FolderFile file, Queue<FolderFile> queue)
|
|
||||||
throws IOException, ApiException {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
mailboxApi.deleteFile(mailboxProperties, file.folderId,
|
|
||||||
file.fileId);
|
|
||||||
} catch (TolerableFailureException e) {
|
|
||||||
// Catch this so we can continue to the next file
|
|
||||||
logException(LOG, INFO, e);
|
|
||||||
}
|
|
||||||
if (queue.isEmpty()) {
|
|
||||||
// List the folders with available files again to check for files
|
|
||||||
// that may have arrived while we were downloading
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state == State.DESTROYED) return;
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(this::apiCallListFolders));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
downloadNextFile(queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onTorReachable() {
|
|
||||||
LOG.info("Our Tor hidden service is reachable");
|
|
||||||
synchronized (lock) {
|
|
||||||
if (state != State.WAITING_FOR_TOR) return;
|
|
||||||
state = State.DOWNLOAD_CYCLE_2;
|
|
||||||
// Start second download cycle
|
|
||||||
apiCall = mailboxApiCaller.retryWithBackoff(
|
|
||||||
new SimpleApiCall(this::apiCallListFolders));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Package access for testing
|
|
||||||
static class FolderFile {
|
|
||||||
|
|
||||||
final MailboxFolderId folderId;
|
|
||||||
final MailboxFileId fileId;
|
|
||||||
|
|
||||||
private FolderFile(MailboxFolderId folderId, MailboxFileId fileId) {
|
|
||||||
this.folderId = folderId;
|
|
||||||
this.fileId = fileId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
|||||||
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
||||||
import org.briarproject.bramble.mailbox.OwnMailboxDownloadWorker.FolderFile;
|
|
||||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||||
import org.briarproject.bramble.test.CaptureArgumentAction;
|
import org.briarproject.bramble.test.CaptureArgumentAction;
|
||||||
import org.jmock.Expectations;
|
import org.jmock.Expectations;
|
||||||
@@ -27,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
import static java.util.Collections.emptyList;
|
import static java.util.Collections.emptyList;
|
||||||
import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
|
import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
|
||||||
|
import static org.briarproject.bramble.mailbox.MailboxDownloadWorker.FolderFile;
|
||||||
import static org.briarproject.bramble.mailbox.OwnMailboxDownloadWorker.MAX_ROUND_ROBIN_FILES;
|
import static org.briarproject.bramble.mailbox.OwnMailboxDownloadWorker.MAX_ROUND_ROBIN_FILES;
|
||||||
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
|
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
|
||||||
import static org.briarproject.bramble.test.TestUtils.getMailboxProperties;
|
import static org.briarproject.bramble.test.TestUtils.getMailboxProperties;
|
||||||
|
|||||||
Reference in New Issue
Block a user