mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-20 14:49:53 +01:00
Merge branch '2291-mailbox-upload-worker' into 'master'
Mailbox upload worker Closes #2291 See merge request briar/briar!1673
This commit is contained in:
@@ -40,7 +40,7 @@ public class IoUtils {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void delete(File f) {
|
public static void delete(File f) {
|
||||||
if (!f.delete() && LOG.isLoggable(WARNING))
|
if (!f.delete() && LOG.isLoggable(WARNING))
|
||||||
LOG.warning("Could not delete " + f.getAbsolutePath());
|
LOG.warning("Could not delete " + f.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -146,10 +146,12 @@ class ContactMailboxDownloadWorker implements MailboxWorker,
|
|||||||
if (state == State.DOWNLOAD_CYCLE_1) {
|
if (state == State.DOWNLOAD_CYCLE_1) {
|
||||||
LOG.info("First download cycle finished");
|
LOG.info("First download cycle finished");
|
||||||
state = State.WAITING_FOR_TOR;
|
state = State.WAITING_FOR_TOR;
|
||||||
|
apiCall = null;
|
||||||
addObserver = true;
|
addObserver = true;
|
||||||
} else if (state == State.DOWNLOAD_CYCLE_2) {
|
} else if (state == State.DOWNLOAD_CYCLE_2) {
|
||||||
LOG.info("Second download cycle finished");
|
LOG.info("Second download cycle finished");
|
||||||
state = State.FINISHED;
|
state = State.FINISHED;
|
||||||
|
apiCall = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (addObserver) {
|
if (addObserver) {
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package org.briarproject.bramble.mailbox;
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.contact.ContactId;
|
||||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -16,6 +18,14 @@ interface MailboxFileManager {
|
|||||||
*/
|
*/
|
||||||
File createTempFileForDownload() throws IOException;
|
File createTempFileForDownload() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a file to be uploaded to the given contact and writes any
|
||||||
|
* waiting data to the file. The IDs of any messages sent or acked will
|
||||||
|
* be added to the given {@link OutgoingSessionRecord}.
|
||||||
|
*/
|
||||||
|
File createAndWriteTempFileForUpload(ContactId contactId,
|
||||||
|
OutgoingSessionRecord sessionRecord) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles a file that has been downloaded. The file should be created
|
* Handles a file that has been downloaded. The file should be created
|
||||||
* with {@link #createTempFileForDownload()}.
|
* with {@link #createTempFileForDownload()}.
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package org.briarproject.bramble.mailbox;
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
import org.briarproject.bramble.api.connection.ConnectionManager;
|
import org.briarproject.bramble.api.connection.ConnectionManager;
|
||||||
|
import org.briarproject.bramble.api.contact.ContactId;
|
||||||
import org.briarproject.bramble.api.event.Event;
|
import org.briarproject.bramble.api.event.Event;
|
||||||
import org.briarproject.bramble.api.event.EventBus;
|
import org.briarproject.bramble.api.event.EventBus;
|
||||||
import org.briarproject.bramble.api.event.EventListener;
|
import org.briarproject.bramble.api.event.EventListener;
|
||||||
@@ -10,13 +11,18 @@ import org.briarproject.bramble.api.mailbox.MailboxDirectory;
|
|||||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
import org.briarproject.bramble.api.plugin.PluginManager;
|
import org.briarproject.bramble.api.plugin.PluginManager;
|
||||||
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
|
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
|
||||||
|
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
||||||
import org.briarproject.bramble.api.plugin.event.TransportActiveEvent;
|
import org.briarproject.bramble.api.plugin.event.TransportActiveEvent;
|
||||||
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
|
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
|
||||||
import org.briarproject.bramble.api.properties.TransportProperties;
|
import org.briarproject.bramble.api.properties.TransportProperties;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
@@ -30,6 +36,7 @@ import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleS
|
|||||||
import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID;
|
import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID;
|
||||||
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
||||||
import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH;
|
import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH;
|
||||||
|
import static org.briarproject.bramble.util.IoUtils.delete;
|
||||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||||
|
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
@@ -41,6 +48,7 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
|
|||||||
|
|
||||||
// Package access for testing
|
// Package access for testing
|
||||||
static final String DOWNLOAD_DIR_NAME = "downloads";
|
static final String DOWNLOAD_DIR_NAME = "downloads";
|
||||||
|
static final String UPLOAD_DIR_NAME = "uploads";
|
||||||
|
|
||||||
private final Executor ioExecutor;
|
private final Executor ioExecutor;
|
||||||
private final PluginManager pluginManager;
|
private final PluginManager pluginManager;
|
||||||
@@ -67,14 +75,44 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public File createTempFileForDownload() throws IOException {
|
public File createTempFileForDownload() throws IOException {
|
||||||
|
return createTempFile(DOWNLOAD_DIR_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public File createAndWriteTempFileForUpload(ContactId contactId,
|
||||||
|
OutgoingSessionRecord sessionRecord) throws IOException {
|
||||||
|
File f = createTempFile(UPLOAD_DIR_NAME);
|
||||||
|
// We shouldn't reach this point until the plugin has been started
|
||||||
|
SimplexPlugin plugin =
|
||||||
|
(SimplexPlugin) requireNonNull(pluginManager.getPlugin(ID));
|
||||||
|
TransportProperties p = new TransportProperties();
|
||||||
|
p.put(PROP_PATH, f.getAbsolutePath());
|
||||||
|
TransportConnectionWriter writer = plugin.createWriter(p);
|
||||||
|
if (writer == null) {
|
||||||
|
delete(f);
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
MailboxFileWriter decorated = new MailboxFileWriter(writer);
|
||||||
|
LOG.info("Writing file for upload");
|
||||||
|
connectionManager.manageOutgoingConnection(contactId, ID, decorated,
|
||||||
|
sessionRecord);
|
||||||
|
if (decorated.awaitDisposal()) {
|
||||||
|
// An exception was thrown during the session - delete the file
|
||||||
|
delete(f);
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
|
private File createTempFile(String dirName) throws IOException {
|
||||||
// Wait for orphaned files to be handled before creating new files
|
// Wait for orphaned files to be handled before creating new files
|
||||||
try {
|
try {
|
||||||
orphanLatch.await();
|
orphanLatch.await();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
File downloadDir = createDirectoryIfNeeded(DOWNLOAD_DIR_NAME);
|
File dir = createDirectoryIfNeeded(dirName);
|
||||||
return File.createTempFile("mailbox", ".tmp", downloadDir);
|
return File.createTempFile("mailbox", ".tmp", dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
private File createDirectoryIfNeeded(String name) throws IOException {
|
private File createDirectoryIfNeeded(String name) throws IOException {
|
||||||
@@ -116,6 +154,8 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void eventOccurred(Event e) {
|
public void eventOccurred(Event e) {
|
||||||
|
// Wait for the transport to become active before handling orphaned
|
||||||
|
// files so that we can get the plugin from the plugin manager
|
||||||
if (e instanceof TransportActiveEvent) {
|
if (e instanceof TransportActiveEvent) {
|
||||||
TransportActiveEvent t = (TransportActiveEvent) e;
|
TransportActiveEvent t = (TransportActiveEvent) e;
|
||||||
if (t.getTransportId().equals(ID)) {
|
if (t.getTransportId().equals(ID)) {
|
||||||
@@ -127,17 +167,25 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is called at startup, as soon as the plugin is started, to
|
* This method is called at startup, as soon as the plugin is started, to
|
||||||
* handle any files that were left in the download directory at the last
|
* delete any files that were left in the upload directory at the last
|
||||||
* shutdown.
|
* shutdown and handle any files that were left in the download directory.
|
||||||
*/
|
*/
|
||||||
@IoExecutor
|
@IoExecutor
|
||||||
private void handleOrphanedFiles() {
|
private void handleOrphanedFiles() {
|
||||||
try {
|
try {
|
||||||
|
File uploadDir = createDirectoryIfNeeded(UPLOAD_DIR_NAME);
|
||||||
|
File[] orphanedUploads = uploadDir.listFiles();
|
||||||
|
if (orphanedUploads != null) {
|
||||||
|
for (File f : orphanedUploads) delete(f);
|
||||||
|
}
|
||||||
File downloadDir = createDirectoryIfNeeded(DOWNLOAD_DIR_NAME);
|
File downloadDir = createDirectoryIfNeeded(DOWNLOAD_DIR_NAME);
|
||||||
File[] orphans = downloadDir.listFiles();
|
File[] orphanedDownloads = downloadDir.listFiles();
|
||||||
// Now that we've got the list of orphans, new files can be created
|
// Now that we've got the list of orphaned downloads, new files
|
||||||
|
// can be created in the download directory
|
||||||
orphanLatch.countDown();
|
orphanLatch.countDown();
|
||||||
if (orphans != null) for (File f : orphans) handleDownloadedFile(f);
|
if (orphanedDownloads != null) {
|
||||||
|
for (File f : orphanedDownloads) handleDownloadedFile(f);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logException(LOG, WARNING, e);
|
logException(LOG, WARNING, e);
|
||||||
}
|
}
|
||||||
@@ -165,9 +213,58 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
|
|||||||
delegate.dispose(exception, recognised);
|
delegate.dispose(exception, recognised);
|
||||||
if (isHandlingComplete(exception, recognised)) {
|
if (isHandlingComplete(exception, recognised)) {
|
||||||
LOG.info("Deleting downloaded file");
|
LOG.info("Deleting downloaded file");
|
||||||
if (!file.delete()) {
|
delete(file);
|
||||||
LOG.warning("Failed to delete downloaded file");
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class MailboxFileWriter
|
||||||
|
implements TransportConnectionWriter {
|
||||||
|
|
||||||
|
private final TransportConnectionWriter delegate;
|
||||||
|
private final BlockingQueue<Boolean> disposalResult =
|
||||||
|
new ArrayBlockingQueue<>(1);
|
||||||
|
|
||||||
|
private MailboxFileWriter(TransportConnectionWriter delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMaxLatency() {
|
||||||
|
return delegate.getMaxLatency();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxIdleTime() {
|
||||||
|
return delegate.getMaxIdleTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isLossyAndCheap() {
|
||||||
|
return delegate.isLossyAndCheap();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OutputStream getOutputStream() throws IOException {
|
||||||
|
return delegate.getOutputStream();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dispose(boolean exception) throws IOException {
|
||||||
|
delegate.dispose(exception);
|
||||||
|
disposalResult.add(exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits for the delegate to be disposed and returns true if an
|
||||||
|
* exception occurred.
|
||||||
|
*/
|
||||||
|
private boolean awaitDisposal() {
|
||||||
|
try {
|
||||||
|
return disposalResult.take();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.info("Interrupted while waiting for disposal");
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,394 @@
|
|||||||
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.Cancellable;
|
||||||
|
import org.briarproject.bramble.api.contact.ContactId;
|
||||||
|
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||||
|
import org.briarproject.bramble.api.db.DbException;
|
||||||
|
import org.briarproject.bramble.api.event.Event;
|
||||||
|
import org.briarproject.bramble.api.event.EventBus;
|
||||||
|
import org.briarproject.bramble.api.event.EventExecutor;
|
||||||
|
import org.briarproject.bramble.api.event.EventListener;
|
||||||
|
import org.briarproject.bramble.api.lifecycle.IoExecutor;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
|
import org.briarproject.bramble.api.sync.MessageId;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
|
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
|
||||||
|
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
|
||||||
|
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
|
||||||
|
import org.briarproject.bramble.api.system.Clock;
|
||||||
|
import org.briarproject.bramble.api.system.TaskScheduler;
|
||||||
|
import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver;
|
||||||
|
import org.briarproject.bramble.mailbox.MailboxApi.ApiException;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import javax.annotation.concurrent.GuardedBy;
|
||||||
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
|
import static java.util.concurrent.TimeUnit.MINUTES;
|
||||||
|
import static java.util.logging.Level.INFO;
|
||||||
|
import static java.util.logging.Level.WARNING;
|
||||||
|
import static java.util.logging.Logger.getLogger;
|
||||||
|
import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY;
|
||||||
|
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
|
||||||
|
import static org.briarproject.bramble.util.IoUtils.delete;
|
||||||
|
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||||
|
|
||||||
|
@ThreadSafe
|
||||||
|
@NotNullByDefault
|
||||||
|
class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
|
||||||
|
EventListener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When the worker is started it checks for data to send. If data is ready
|
||||||
|
* to send, the worker waits for a connectivity check, then writes and
|
||||||
|
* uploads a file and checks again for data to send.
|
||||||
|
* <p>
|
||||||
|
* If data is due to be sent at some time in the future, the worker
|
||||||
|
* schedules a wakeup for that time and also listens for events indicating
|
||||||
|
* that new data may be ready to send.
|
||||||
|
* <p>
|
||||||
|
* If there's no data to send, the worker listens for events indicating
|
||||||
|
* that new data may be ready to send.
|
||||||
|
*/
|
||||||
|
private enum State {
|
||||||
|
CREATED,
|
||||||
|
CHECKING_FOR_DATA,
|
||||||
|
WAITING_FOR_DATA,
|
||||||
|
CONNECTIVITY_CHECK,
|
||||||
|
WRITING_UPLOADING,
|
||||||
|
DESTROYED
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
getLogger(MailboxUploadWorker.class.getName());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When we're waiting for data to send and an event indicates that new data
|
||||||
|
* may have become available, wait this long before checking the DB. This
|
||||||
|
* should help to avoid creating lots of small files when several acks or
|
||||||
|
* messages become available to send in a short period (eg when reading a
|
||||||
|
* file downloaded from a mailbox).
|
||||||
|
* <p>
|
||||||
|
* Package access for testing.
|
||||||
|
*/
|
||||||
|
static final long CHECK_DELAY_MS = 5_000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How long to wait before retrying when an exception occurs while writing
|
||||||
|
* a file.
|
||||||
|
* <p>
|
||||||
|
* Package access for testing.
|
||||||
|
*/
|
||||||
|
static final long RETRY_DELAY_MS = MINUTES.toMillis(1);
|
||||||
|
|
||||||
|
private final Executor ioExecutor;
|
||||||
|
private final DatabaseComponent db;
|
||||||
|
private final Clock clock;
|
||||||
|
private final TaskScheduler taskScheduler;
|
||||||
|
private final EventBus eventBus;
|
||||||
|
private final ConnectivityChecker connectivityChecker;
|
||||||
|
private final MailboxApiCaller mailboxApiCaller;
|
||||||
|
private final MailboxApi mailboxApi;
|
||||||
|
private final MailboxFileManager mailboxFileManager;
|
||||||
|
private final MailboxProperties mailboxProperties;
|
||||||
|
private final MailboxFolderId folderId;
|
||||||
|
private final ContactId contactId;
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
@GuardedBy("lock")
|
||||||
|
private State state = State.CREATED;
|
||||||
|
|
||||||
|
@GuardedBy("lock")
|
||||||
|
@Nullable
|
||||||
|
private Cancellable wakeupTask = null, checkTask = null, apiCall = null;
|
||||||
|
|
||||||
|
@GuardedBy("lock")
|
||||||
|
@Nullable
|
||||||
|
private File file = null;
|
||||||
|
|
||||||
|
MailboxUploadWorker(@IoExecutor Executor ioExecutor,
|
||||||
|
DatabaseComponent db,
|
||||||
|
Clock clock,
|
||||||
|
TaskScheduler taskScheduler,
|
||||||
|
EventBus eventBus,
|
||||||
|
ConnectivityChecker connectivityChecker,
|
||||||
|
MailboxApiCaller mailboxApiCaller,
|
||||||
|
MailboxApi mailboxApi,
|
||||||
|
MailboxFileManager mailboxFileManager,
|
||||||
|
MailboxProperties mailboxProperties,
|
||||||
|
MailboxFolderId folderId,
|
||||||
|
ContactId contactId) {
|
||||||
|
this.ioExecutor = ioExecutor;
|
||||||
|
this.db = db;
|
||||||
|
this.clock = clock;
|
||||||
|
this.taskScheduler = taskScheduler;
|
||||||
|
this.eventBus = eventBus;
|
||||||
|
this.connectivityChecker = connectivityChecker;
|
||||||
|
this.mailboxApiCaller = mailboxApiCaller;
|
||||||
|
this.mailboxApi = mailboxApi;
|
||||||
|
this.mailboxFileManager = mailboxFileManager;
|
||||||
|
this.mailboxProperties = mailboxProperties;
|
||||||
|
this.folderId = folderId;
|
||||||
|
this.contactId = contactId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
LOG.info("Started");
|
||||||
|
synchronized (lock) {
|
||||||
|
// Don't allow the worker to be reused
|
||||||
|
if (state != State.CREATED) throw new IllegalStateException();
|
||||||
|
state = State.CHECKING_FOR_DATA;
|
||||||
|
}
|
||||||
|
ioExecutor.execute(this::checkForDataToSend);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
LOG.info("Destroyed");
|
||||||
|
Cancellable wakeupTask, checkTask, apiCall;
|
||||||
|
File file;
|
||||||
|
synchronized (lock) {
|
||||||
|
state = State.DESTROYED;
|
||||||
|
wakeupTask = this.wakeupTask;
|
||||||
|
this.wakeupTask = null;
|
||||||
|
checkTask = this.checkTask;
|
||||||
|
this.checkTask = null;
|
||||||
|
apiCall = this.apiCall;
|
||||||
|
this.apiCall = null;
|
||||||
|
file = this.file;
|
||||||
|
this.file = null;
|
||||||
|
}
|
||||||
|
if (wakeupTask != null) wakeupTask.cancel();
|
||||||
|
if (checkTask != null) checkTask.cancel();
|
||||||
|
if (apiCall != null) apiCall.cancel();
|
||||||
|
if (file != null) delete(file);
|
||||||
|
connectivityChecker.removeObserver(this);
|
||||||
|
eventBus.removeListener(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@IoExecutor
|
||||||
|
private void checkForDataToSend() {
|
||||||
|
synchronized (lock) {
|
||||||
|
checkTask = null;
|
||||||
|
if (state != State.CHECKING_FOR_DATA) return;
|
||||||
|
}
|
||||||
|
LOG.info("Checking for data to send");
|
||||||
|
try {
|
||||||
|
db.transaction(true, txn -> {
|
||||||
|
long nextSendTime;
|
||||||
|
if (db.containsAcksToSend(txn, contactId)) {
|
||||||
|
nextSendTime = 0L;
|
||||||
|
} else {
|
||||||
|
nextSendTime = db.getNextSendTime(txn, contactId,
|
||||||
|
MAX_LATENCY);
|
||||||
|
}
|
||||||
|
// Handle the result on the event executor to avoid races with
|
||||||
|
// incoming events
|
||||||
|
txn.attach(() -> handleNextSendTime(nextSendTime));
|
||||||
|
});
|
||||||
|
} catch (DbException e) {
|
||||||
|
logException(LOG, WARNING, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventExecutor
|
||||||
|
private void handleNextSendTime(long nextSendTime) {
|
||||||
|
if (nextSendTime == Long.MAX_VALUE) {
|
||||||
|
// Nothing is sendable now or due to be sent in the future. Wait
|
||||||
|
// for an event indicating that new data may be ready to send
|
||||||
|
waitForDataToSend();
|
||||||
|
} else {
|
||||||
|
// Work out the delay until data's ready to send (may be negative)
|
||||||
|
long delay = nextSendTime - clock.currentTimeMillis();
|
||||||
|
if (delay > 0) {
|
||||||
|
// Schedule a wakeup when data will be ready to send. If an
|
||||||
|
// event is received in the meantime indicating that new data
|
||||||
|
// may be ready to send, we'll cancel the wakeup
|
||||||
|
scheduleWakeup(delay);
|
||||||
|
} else {
|
||||||
|
// Data is ready to send now
|
||||||
|
checkConnectivity();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventExecutor
|
||||||
|
private void waitForDataToSend() {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.CHECKING_FOR_DATA) return;
|
||||||
|
state = State.WAITING_FOR_DATA;
|
||||||
|
LOG.info("Waiting for data to send");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventExecutor
|
||||||
|
private void scheduleWakeup(long delay) {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.CHECKING_FOR_DATA) return;
|
||||||
|
state = State.WAITING_FOR_DATA;
|
||||||
|
if (LOG.isLoggable(INFO)) {
|
||||||
|
LOG.info("Scheduling wakeup in " + delay + " ms");
|
||||||
|
}
|
||||||
|
wakeupTask = taskScheduler.schedule(this::wakeUp, ioExecutor,
|
||||||
|
delay, MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@IoExecutor
|
||||||
|
private void wakeUp() {
|
||||||
|
LOG.info("Woke up");
|
||||||
|
synchronized (lock) {
|
||||||
|
wakeupTask = null;
|
||||||
|
if (state != State.WAITING_FOR_DATA) return;
|
||||||
|
state = State.CHECKING_FOR_DATA;
|
||||||
|
}
|
||||||
|
checkForDataToSend();
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventExecutor
|
||||||
|
private void checkConnectivity() {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.CHECKING_FOR_DATA) return;
|
||||||
|
state = State.CONNECTIVITY_CHECK;
|
||||||
|
}
|
||||||
|
LOG.info("Checking connectivity");
|
||||||
|
// Avoid leaking observer in case destroy() is called concurrently
|
||||||
|
// before observer is added
|
||||||
|
connectivityChecker.checkConnectivity(mailboxProperties, this);
|
||||||
|
boolean destroyed;
|
||||||
|
synchronized (lock) {
|
||||||
|
destroyed = state == State.DESTROYED;
|
||||||
|
}
|
||||||
|
if (destroyed) connectivityChecker.removeObserver(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConnectivityCheckSucceeded() {
|
||||||
|
LOG.info("Connectivity check succeeded");
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.CONNECTIVITY_CHECK) return;
|
||||||
|
state = State.WRITING_UPLOADING;
|
||||||
|
}
|
||||||
|
ioExecutor.execute(this::writeAndUploadFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
@IoExecutor
|
||||||
|
private void writeAndUploadFile() {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.WRITING_UPLOADING) return;
|
||||||
|
}
|
||||||
|
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||||
|
File file;
|
||||||
|
try {
|
||||||
|
file = mailboxFileManager.createAndWriteTempFileForUpload(
|
||||||
|
contactId, sessionRecord);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logException(LOG, WARNING, e);
|
||||||
|
// Try again after a delay
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.WRITING_UPLOADING) return;
|
||||||
|
state = State.CHECKING_FOR_DATA;
|
||||||
|
checkTask = taskScheduler.schedule(this::checkForDataToSend,
|
||||||
|
ioExecutor, RETRY_DELAY_MS, MILLISECONDS);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
boolean deleteFile = false;
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state == State.WRITING_UPLOADING) {
|
||||||
|
this.file = file;
|
||||||
|
apiCall = mailboxApiCaller.retryWithBackoff(
|
||||||
|
new SimpleApiCall(() -> apiCallUploadFile(file,
|
||||||
|
sessionRecord)));
|
||||||
|
} else {
|
||||||
|
deleteFile = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (deleteFile) delete(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
@IoExecutor
|
||||||
|
private void apiCallUploadFile(File file,
|
||||||
|
OutgoingSessionRecord sessionRecord)
|
||||||
|
throws IOException, ApiException {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.WRITING_UPLOADING) return;
|
||||||
|
}
|
||||||
|
LOG.info("Uploading file");
|
||||||
|
mailboxApi.addFile(mailboxProperties, folderId, file);
|
||||||
|
markMessagesSentOrAcked(sessionRecord);
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.WRITING_UPLOADING) return;
|
||||||
|
state = State.CHECKING_FOR_DATA;
|
||||||
|
apiCall = null;
|
||||||
|
this.file = null;
|
||||||
|
}
|
||||||
|
delete(file);
|
||||||
|
checkForDataToSend();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void markMessagesSentOrAcked(OutgoingSessionRecord sessionRecord) {
|
||||||
|
Collection<MessageId> acked = sessionRecord.getAckedIds();
|
||||||
|
Collection<MessageId> sent = sessionRecord.getSentIds();
|
||||||
|
try {
|
||||||
|
db.transaction(false, txn -> {
|
||||||
|
if (!acked.isEmpty()) {
|
||||||
|
db.setAckSent(txn, contactId, acked);
|
||||||
|
}
|
||||||
|
if (!sent.isEmpty()) {
|
||||||
|
db.setMessagesSent(txn, contactId, sent, MAX_LATENCY);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (DbException e) {
|
||||||
|
logException(LOG, WARNING, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void eventOccurred(Event e) {
|
||||||
|
if (e instanceof MessageToAckEvent) {
|
||||||
|
MessageToAckEvent m = (MessageToAckEvent) e;
|
||||||
|
if (m.getContactId().equals(contactId)) {
|
||||||
|
LOG.info("Message to ack");
|
||||||
|
onDataToSend();
|
||||||
|
}
|
||||||
|
} else if (e instanceof MessageSharedEvent) {
|
||||||
|
LOG.info("Message shared");
|
||||||
|
onDataToSend();
|
||||||
|
} else if (e instanceof GroupVisibilityUpdatedEvent) {
|
||||||
|
GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e;
|
||||||
|
if (g.getVisibility() == SHARED &&
|
||||||
|
g.getAffectedContacts().contains(contactId)) {
|
||||||
|
LOG.info("Group shared");
|
||||||
|
onDataToSend();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventExecutor
|
||||||
|
private void onDataToSend() {
|
||||||
|
Cancellable wakeupTask;
|
||||||
|
synchronized (lock) {
|
||||||
|
if (state != State.WAITING_FOR_DATA) return;
|
||||||
|
state = State.CHECKING_FOR_DATA;
|
||||||
|
wakeupTask = this.wakeupTask;
|
||||||
|
this.wakeupTask = null;
|
||||||
|
// Delay the check to avoid creating lots of small files
|
||||||
|
checkTask = taskScheduler.schedule(this::checkForDataToSend,
|
||||||
|
ioExecutor, CHECK_DELAY_MS, MILLISECONDS);
|
||||||
|
}
|
||||||
|
// If we had scheduled a wakeup when data was due to be sent, cancel it
|
||||||
|
if (wakeupTask != null) wakeupTask.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
package org.briarproject.bramble.mailbox;
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.Cancellable;
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxFileId;
|
import org.briarproject.bramble.api.mailbox.MailboxFileId;
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile;
|
||||||
@@ -7,6 +8,7 @@ import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException;
|
|||||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||||
import org.briarproject.bramble.test.CaptureArgumentAction;
|
import org.briarproject.bramble.test.CaptureArgumentAction;
|
||||||
import org.jmock.Expectations;
|
import org.jmock.Expectations;
|
||||||
|
import org.jmock.lib.action.DoAllAction;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@@ -36,6 +38,7 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
|
|||||||
private final MailboxApi mailboxApi = context.mock(MailboxApi.class);
|
private final MailboxApi mailboxApi = context.mock(MailboxApi.class);
|
||||||
private final MailboxFileManager mailboxFileManager =
|
private final MailboxFileManager mailboxFileManager =
|
||||||
context.mock(MailboxFileManager.class);
|
context.mock(MailboxFileManager.class);
|
||||||
|
private final Cancellable apiCall = context.mock(Cancellable.class);
|
||||||
|
|
||||||
private final MailboxProperties mailboxProperties =
|
private final MailboxProperties mailboxProperties =
|
||||||
getMailboxProperties(false, CLIENT_SUPPORTS);
|
getMailboxProperties(false, CLIENT_SUPPORTS);
|
||||||
@@ -96,30 +99,36 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
|
|||||||
|
|
||||||
// When the connectivity check succeeds, a list-inbox task should be
|
// When the connectivity check succeeds, a list-inbox task should be
|
||||||
// started for the first download cycle
|
// started for the first download cycle
|
||||||
AtomicReference<ApiCall> listTask = new AtomicReference<>(null);
|
AtomicReference<ApiCall> listTask = new AtomicReference<>();
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
will(new CaptureArgumentAction<>(listTask, ApiCall.class, 0));
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(listTask, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
}});
|
}});
|
||||||
|
|
||||||
worker.onConnectivityCheckSucceeded();
|
worker.onConnectivityCheckSucceeded();
|
||||||
|
|
||||||
// When the list-inbox tasks runs and finds some files to download,
|
// When the list-inbox tasks runs and finds some files to download,
|
||||||
// it should start a download task for the first file
|
// it should start a download task for the first file
|
||||||
AtomicReference<ApiCall> downloadTask = new AtomicReference<>(null);
|
AtomicReference<ApiCall> downloadTask = new AtomicReference<>();
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
oneOf(mailboxApi).getFiles(mailboxProperties,
|
oneOf(mailboxApi).getFiles(mailboxProperties,
|
||||||
requireNonNull(mailboxProperties.getInboxId()));
|
requireNonNull(mailboxProperties.getInboxId()));
|
||||||
will(returnValue(files));
|
will(returnValue(files));
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
will(new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0));
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
}});
|
}});
|
||||||
|
|
||||||
assertFalse(listTask.get().callApi());
|
assertFalse(listTask.get().callApi());
|
||||||
|
|
||||||
// When the first download task runs it should download the file to the
|
// When the first download task runs it should download the file to the
|
||||||
// location provided by the file manager and start a delete task
|
// location provided by the file manager and start a delete task
|
||||||
AtomicReference<ApiCall> deleteTask = new AtomicReference<>(null);
|
AtomicReference<ApiCall> deleteTask = new AtomicReference<>();
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
oneOf(mailboxFileManager).createTempFileForDownload();
|
oneOf(mailboxFileManager).createTempFileForDownload();
|
||||||
will(returnValue(tempFile));
|
will(returnValue(tempFile));
|
||||||
@@ -128,7 +137,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
|
|||||||
file1.name, tempFile);
|
file1.name, tempFile);
|
||||||
oneOf(mailboxFileManager).handleDownloadedFile(tempFile);
|
oneOf(mailboxFileManager).handleDownloadedFile(tempFile);
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
will(new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0));
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
}});
|
}});
|
||||||
|
|
||||||
assertFalse(downloadTask.get().callApi());
|
assertFalse(downloadTask.get().callApi());
|
||||||
@@ -140,7 +152,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
|
|||||||
requireNonNull(mailboxProperties.getInboxId()), file1.name);
|
requireNonNull(mailboxProperties.getInboxId()), file1.name);
|
||||||
will(throwException(new TolerableFailureException()));
|
will(throwException(new TolerableFailureException()));
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
will(new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0));
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
}});
|
}});
|
||||||
|
|
||||||
assertFalse(deleteTask.get().callApi());
|
assertFalse(deleteTask.get().callApi());
|
||||||
@@ -155,7 +170,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
|
|||||||
file2.name, tempFile);
|
file2.name, tempFile);
|
||||||
oneOf(mailboxFileManager).handleDownloadedFile(tempFile);
|
oneOf(mailboxFileManager).handleDownloadedFile(tempFile);
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
will(new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0));
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
}});
|
}});
|
||||||
|
|
||||||
assertFalse(downloadTask.get().callApi());
|
assertFalse(downloadTask.get().callApi());
|
||||||
@@ -168,7 +186,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
|
|||||||
requireNonNull(mailboxProperties.getInboxId()), file2.name);
|
requireNonNull(mailboxProperties.getInboxId()), file2.name);
|
||||||
will(throwException(new TolerableFailureException()));
|
will(throwException(new TolerableFailureException()));
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
will(new CaptureArgumentAction<>(listTask, ApiCall.class, 0));
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(listTask, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
}});
|
}});
|
||||||
|
|
||||||
assertFalse(deleteTask.get().callApi());
|
assertFalse(deleteTask.get().callApi());
|
||||||
@@ -188,7 +209,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase {
|
|||||||
// be started for the second download cycle
|
// be started for the second download cycle
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
will(new CaptureArgumentAction<>(listTask, ApiCall.class, 0));
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(listTask, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
}});
|
}});
|
||||||
|
|
||||||
worker.onTorReachable();
|
worker.onTorReachable();
|
||||||
|
|||||||
@@ -2,16 +2,20 @@ package org.briarproject.bramble.mailbox;
|
|||||||
|
|
||||||
import org.briarproject.bramble.api.connection.ConnectionManager;
|
import org.briarproject.bramble.api.connection.ConnectionManager;
|
||||||
import org.briarproject.bramble.api.connection.ConnectionManager.TagController;
|
import org.briarproject.bramble.api.connection.ConnectionManager.TagController;
|
||||||
|
import org.briarproject.bramble.api.contact.ContactId;
|
||||||
import org.briarproject.bramble.api.event.EventBus;
|
import org.briarproject.bramble.api.event.EventBus;
|
||||||
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
|
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
|
||||||
import org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState;
|
import org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState;
|
||||||
import org.briarproject.bramble.api.plugin.PluginManager;
|
import org.briarproject.bramble.api.plugin.PluginManager;
|
||||||
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
|
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
|
||||||
|
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
||||||
import org.briarproject.bramble.api.plugin.event.TransportActiveEvent;
|
import org.briarproject.bramble.api.plugin.event.TransportActiveEvent;
|
||||||
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
|
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
|
||||||
import org.briarproject.bramble.api.properties.TransportProperties;
|
import org.briarproject.bramble.api.properties.TransportProperties;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||||
import org.briarproject.bramble.test.CaptureArgumentAction;
|
import org.briarproject.bramble.test.CaptureArgumentAction;
|
||||||
|
import org.briarproject.bramble.test.ConsumeArgumentAction;
|
||||||
import org.briarproject.bramble.test.RunAction;
|
import org.briarproject.bramble.test.RunAction;
|
||||||
import org.jmock.Expectations;
|
import org.jmock.Expectations;
|
||||||
import org.jmock.lib.action.DoAllAction;
|
import org.jmock.lib.action.DoAllAction;
|
||||||
@@ -20,6 +24,7 @@ import org.junit.Before;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
@@ -28,11 +33,14 @@ import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleS
|
|||||||
import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID;
|
import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID;
|
||||||
import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH;
|
import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH;
|
||||||
import static org.briarproject.bramble.mailbox.MailboxFileManagerImpl.DOWNLOAD_DIR_NAME;
|
import static org.briarproject.bramble.mailbox.MailboxFileManagerImpl.DOWNLOAD_DIR_NAME;
|
||||||
|
import static org.briarproject.bramble.mailbox.MailboxFileManagerImpl.UPLOAD_DIR_NAME;
|
||||||
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
|
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.getContactId;
|
||||||
import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
|
import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class MailboxFileManagerImplTest extends BrambleMockTestCase {
|
public class MailboxFileManagerImplTest extends BrambleMockTestCase {
|
||||||
|
|
||||||
@@ -47,6 +55,10 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
|
|||||||
private final SimplexPlugin plugin = context.mock(SimplexPlugin.class);
|
private final SimplexPlugin plugin = context.mock(SimplexPlugin.class);
|
||||||
private final TransportConnectionReader transportConnectionReader =
|
private final TransportConnectionReader transportConnectionReader =
|
||||||
context.mock(TransportConnectionReader.class);
|
context.mock(TransportConnectionReader.class);
|
||||||
|
private final TransportConnectionWriter transportConnectionWriter =
|
||||||
|
context.mock(TransportConnectionWriter.class);
|
||||||
|
|
||||||
|
private final ContactId contactId = getContactId();
|
||||||
|
|
||||||
private File mailboxDir;
|
private File mailboxDir;
|
||||||
private MailboxFileManagerImpl manager;
|
private MailboxFileManagerImpl manager;
|
||||||
@@ -65,17 +77,25 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHandlesOrphanedFilesAtStartup() throws Exception {
|
public void testHandlesOrphanedFilesAtStartup() throws Exception {
|
||||||
// Create an orphaned file, left behind at the previous shutdown
|
// Create an orphaned upload, left behind at the previous shutdown
|
||||||
|
File uploadDir = new File(mailboxDir, UPLOAD_DIR_NAME);
|
||||||
|
//noinspection ResultOfMethodCallIgnored
|
||||||
|
uploadDir.mkdirs();
|
||||||
|
File orphanedUpload = new File(uploadDir, "orphan");
|
||||||
|
assertTrue(orphanedUpload.createNewFile());
|
||||||
|
|
||||||
|
// Create an orphaned download, left behind at the previous shutdown
|
||||||
File downloadDir = new File(mailboxDir, DOWNLOAD_DIR_NAME);
|
File downloadDir = new File(mailboxDir, DOWNLOAD_DIR_NAME);
|
||||||
//noinspection ResultOfMethodCallIgnored
|
//noinspection ResultOfMethodCallIgnored
|
||||||
downloadDir.mkdirs();
|
downloadDir.mkdirs();
|
||||||
File orphan = new File(downloadDir, "orphan");
|
File orphanedDownload = new File(downloadDir, "orphan");
|
||||||
assertTrue(orphan.createNewFile());
|
assertTrue(orphanedDownload.createNewFile());
|
||||||
|
|
||||||
TransportProperties props = new TransportProperties();
|
TransportProperties props = new TransportProperties();
|
||||||
props.put(PROP_PATH, orphan.getAbsolutePath());
|
props.put(PROP_PATH, orphanedDownload.getAbsolutePath());
|
||||||
|
|
||||||
// When the plugin becomes active the orphaned file should be handled
|
// When the plugin becomes active the orphaned upload should be deleted
|
||||||
|
// and the orphaned download should be handled
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
oneOf(ioExecutor).execute(with(any(Runnable.class)));
|
oneOf(ioExecutor).execute(with(any(Runnable.class)));
|
||||||
will(new RunAction());
|
will(new RunAction());
|
||||||
@@ -90,10 +110,12 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
|
|||||||
}});
|
}});
|
||||||
|
|
||||||
manager.eventOccurred(new TransportActiveEvent(ID));
|
manager.eventOccurred(new TransportActiveEvent(ID));
|
||||||
|
|
||||||
|
assertFalse(orphanedUpload.exists());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeletesFileWhenReadSucceeds() throws Exception {
|
public void testDeletesDownloadedFileWhenReadSucceeds() throws Exception {
|
||||||
expectCheckForOrphans();
|
expectCheckForOrphans();
|
||||||
manager.eventOccurred(new TransportActiveEvent(ID));
|
manager.eventOccurred(new TransportActiveEvent(ID));
|
||||||
|
|
||||||
@@ -102,7 +124,7 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
|
|||||||
new AtomicReference<>(null);
|
new AtomicReference<>(null);
|
||||||
AtomicReference<TagController> controller = new AtomicReference<>(null);
|
AtomicReference<TagController> controller = new AtomicReference<>(null);
|
||||||
|
|
||||||
expectPassFileToConnectionManager(f, reader, controller);
|
expectPassDownloadedFileToConnectionManager(f, reader, controller);
|
||||||
manager.handleDownloadedFile(f);
|
manager.handleDownloadedFile(f);
|
||||||
|
|
||||||
// The read is successful, so the tag controller should allow the tag
|
// The read is successful, so the tag controller should allow the tag
|
||||||
@@ -117,29 +139,117 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeletesFileWhenTagIsNotRecognised() throws Exception {
|
public void testDeletesDownloadedFileWhenTagIsNotRecognised()
|
||||||
testDeletesFile(false, RUNNING, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDeletesFileWhenReadFails() throws Exception {
|
|
||||||
testDeletesFile(true, RUNNING, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDoesNotDeleteFileWhenTagIsNotRecognisedAtShutdown()
|
|
||||||
throws Exception {
|
throws Exception {
|
||||||
testDeletesFile(false, STOPPING, true);
|
testDeletesDownloadedFile(false, RUNNING, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDoesNotDeleteFileWhenReadFailsAtShutdown()
|
public void testDeletesDownloadedFileWhenReadFails() throws Exception {
|
||||||
throws Exception {
|
testDeletesDownloadedFile(true, RUNNING, false);
|
||||||
testDeletesFile(true, STOPPING, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testDeletesFile(boolean recognised, LifecycleState state,
|
@Test
|
||||||
boolean fileExists) throws Exception {
|
public void testDoesNotDeleteDownloadedFileWhenTagIsNotRecognisedAtShutdown()
|
||||||
|
throws Exception {
|
||||||
|
testDeletesDownloadedFile(false, STOPPING, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDoesNotDeleteDownloadedFileWhenReadFailsAtShutdown()
|
||||||
|
throws Exception {
|
||||||
|
testDeletesDownloadedFile(true, STOPPING, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testThrowsExceptionIfPluginFailsToCreateWriter()
|
||||||
|
throws Exception {
|
||||||
|
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||||
|
|
||||||
|
expectCheckForOrphans();
|
||||||
|
manager.eventOccurred(new TransportActiveEvent(ID));
|
||||||
|
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(pluginManager).getPlugin(ID);
|
||||||
|
will(returnValue(plugin));
|
||||||
|
oneOf(plugin).createWriter(with(any(TransportProperties.class)));
|
||||||
|
will(returnValue(null));
|
||||||
|
}});
|
||||||
|
|
||||||
|
manager.createAndWriteTempFileForUpload(contactId, sessionRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testThrowsExceptionIfSessionFailsWithException()
|
||||||
|
throws Exception {
|
||||||
|
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||||
|
|
||||||
|
expectCheckForOrphans();
|
||||||
|
manager.eventOccurred(new TransportActiveEvent(ID));
|
||||||
|
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(pluginManager).getPlugin(ID);
|
||||||
|
will(returnValue(plugin));
|
||||||
|
oneOf(plugin).createWriter(with(any(TransportProperties.class)));
|
||||||
|
will(returnValue(transportConnectionWriter));
|
||||||
|
oneOf(transportConnectionWriter).dispose(true);
|
||||||
|
oneOf(connectionManager).manageOutgoingConnection(with(contactId),
|
||||||
|
with(ID), with(any(TransportConnectionWriter.class)),
|
||||||
|
with(sessionRecord));
|
||||||
|
// The session fails with an exception. We need to use an action
|
||||||
|
// for this, as createAndWriteTempFileForUpload() waits for it to
|
||||||
|
// happen before returning
|
||||||
|
will(new ConsumeArgumentAction<>(TransportConnectionWriter.class, 2,
|
||||||
|
writer -> {
|
||||||
|
try {
|
||||||
|
writer.dispose(true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}});
|
||||||
|
|
||||||
|
manager.createAndWriteTempFileForUpload(contactId, sessionRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReturnsFileIfSessionSucceeds() throws Exception {
|
||||||
|
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||||
|
|
||||||
|
expectCheckForOrphans();
|
||||||
|
manager.eventOccurred(new TransportActiveEvent(ID));
|
||||||
|
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(pluginManager).getPlugin(ID);
|
||||||
|
will(returnValue(plugin));
|
||||||
|
oneOf(plugin).createWriter(with(any(TransportProperties.class)));
|
||||||
|
will(returnValue(transportConnectionWriter));
|
||||||
|
oneOf(transportConnectionWriter).dispose(false);
|
||||||
|
oneOf(connectionManager).manageOutgoingConnection(with(contactId),
|
||||||
|
with(ID), with(any(TransportConnectionWriter.class)),
|
||||||
|
with(sessionRecord));
|
||||||
|
// The session succeeds. We need to use an action for this, as
|
||||||
|
// createAndWriteTempFileForUpload() waits for it to happen before
|
||||||
|
// returning
|
||||||
|
will(new ConsumeArgumentAction<>(TransportConnectionWriter.class, 2,
|
||||||
|
writer -> {
|
||||||
|
try {
|
||||||
|
writer.dispose(false);
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}});
|
||||||
|
|
||||||
|
File f = manager.createAndWriteTempFileForUpload(contactId,
|
||||||
|
sessionRecord);
|
||||||
|
assertTrue(f.exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testDeletesDownloadedFile(boolean recognised,
|
||||||
|
LifecycleState state, boolean fileExists) throws Exception {
|
||||||
expectCheckForOrphans();
|
expectCheckForOrphans();
|
||||||
manager.eventOccurred(new TransportActiveEvent(ID));
|
manager.eventOccurred(new TransportActiveEvent(ID));
|
||||||
|
|
||||||
@@ -148,7 +258,7 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
|
|||||||
new AtomicReference<>(null);
|
new AtomicReference<>(null);
|
||||||
AtomicReference<TagController> controller = new AtomicReference<>(null);
|
AtomicReference<TagController> controller = new AtomicReference<>(null);
|
||||||
|
|
||||||
expectPassFileToConnectionManager(f, reader, controller);
|
expectPassDownloadedFileToConnectionManager(f, reader, controller);
|
||||||
manager.handleDownloadedFile(f);
|
manager.handleDownloadedFile(f);
|
||||||
|
|
||||||
context.checking(new Expectations() {{
|
context.checking(new Expectations() {{
|
||||||
@@ -169,7 +279,7 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
|
|||||||
}});
|
}});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expectPassFileToConnectionManager(File f,
|
private void expectPassDownloadedFileToConnectionManager(File f,
|
||||||
AtomicReference<TransportConnectionReader> reader,
|
AtomicReference<TransportConnectionReader> reader,
|
||||||
AtomicReference<TagController> controller) {
|
AtomicReference<TagController> controller) {
|
||||||
TransportProperties props = new TransportProperties();
|
TransportProperties props = new TransportProperties();
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ import org.briarproject.bramble.api.mailbox.MailboxSettingsManager;
|
|||||||
import org.briarproject.bramble.api.mailbox.MailboxUpdate;
|
import org.briarproject.bramble.api.mailbox.MailboxUpdate;
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxUpdateManager;
|
import org.briarproject.bramble.api.mailbox.MailboxUpdateManager;
|
||||||
import org.briarproject.bramble.api.mailbox.MailboxVersion;
|
import org.briarproject.bramble.api.mailbox.MailboxVersion;
|
||||||
import org.briarproject.bramble.api.mailbox.event.OwnMailboxConnectionStatusEvent;
|
|
||||||
import org.briarproject.bramble.api.system.Clock;
|
import org.briarproject.bramble.api.system.Clock;
|
||||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||||
import org.briarproject.bramble.test.DbExpectations;
|
import org.briarproject.bramble.test.DbExpectations;
|
||||||
@@ -33,7 +32,6 @@ import static java.util.Collections.singletonList;
|
|||||||
import static org.briarproject.bramble.test.TestUtils.getContact;
|
import static org.briarproject.bramble.test.TestUtils.getContact;
|
||||||
import static org.briarproject.bramble.test.TestUtils.getRandomBytes;
|
import static org.briarproject.bramble.test.TestUtils.getRandomBytes;
|
||||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||||
import static org.briarproject.bramble.test.TestUtils.hasEvent;
|
|
||||||
import static org.briarproject.bramble.util.StringUtils.getRandomString;
|
import static org.briarproject.bramble.util.StringUtils.getRandomString;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@@ -138,7 +136,6 @@ public class MailboxPairingTaskImplTest extends BrambleMockTestCase {
|
|||||||
i.getAndIncrement();
|
i.getAndIncrement();
|
||||||
});
|
});
|
||||||
task.run();
|
task.run();
|
||||||
hasEvent(txn, OwnMailboxConnectionStatusEvent.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -163,7 +163,7 @@ public class MailboxSettingsManagerImplTest extends BrambleMockTestCase {
|
|||||||
}});
|
}});
|
||||||
|
|
||||||
manager.recordSuccessfulConnection(txn, now);
|
manager.recordSuccessfulConnection(txn, now);
|
||||||
hasEvent(txn, OwnMailboxConnectionStatusEvent.class);
|
assertTrue(hasEvent(txn, OwnMailboxConnectionStatusEvent.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -0,0 +1,465 @@
|
|||||||
|
package org.briarproject.bramble.mailbox;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.Cancellable;
|
||||||
|
import org.briarproject.bramble.api.contact.ContactId;
|
||||||
|
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||||
|
import org.briarproject.bramble.api.db.Transaction;
|
||||||
|
import org.briarproject.bramble.api.event.EventBus;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxFolderId;
|
||||||
|
import org.briarproject.bramble.api.mailbox.MailboxProperties;
|
||||||
|
import org.briarproject.bramble.api.sync.MessageId;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
|
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
|
||||||
|
import org.briarproject.bramble.api.system.Clock;
|
||||||
|
import org.briarproject.bramble.api.system.TaskScheduler;
|
||||||
|
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||||
|
import org.briarproject.bramble.test.CaptureArgumentAction;
|
||||||
|
import org.briarproject.bramble.test.ConsumeArgumentAction;
|
||||||
|
import org.briarproject.bramble.test.DbExpectations;
|
||||||
|
import org.briarproject.bramble.test.RunAction;
|
||||||
|
import org.jmock.Expectations;
|
||||||
|
import org.jmock.lib.action.DoAllAction;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static java.util.Collections.singletonList;
|
||||||
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
|
import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
|
||||||
|
import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY;
|
||||||
|
import static org.briarproject.bramble.mailbox.MailboxUploadWorker.CHECK_DELAY_MS;
|
||||||
|
import static org.briarproject.bramble.mailbox.MailboxUploadWorker.RETRY_DELAY_MS;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.getContactId;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.getMailboxProperties;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||||
|
import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class MailboxUploadWorkerTest extends BrambleMockTestCase {
|
||||||
|
|
||||||
|
private final Executor ioExecutor = context.mock(Executor.class);
|
||||||
|
private final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||||
|
private final Clock clock = context.mock(Clock.class);
|
||||||
|
private final TaskScheduler taskScheduler =
|
||||||
|
context.mock(TaskScheduler.class);
|
||||||
|
private final EventBus eventBus = context.mock(EventBus.class);
|
||||||
|
private final ConnectivityChecker connectivityChecker =
|
||||||
|
context.mock(ConnectivityChecker.class);
|
||||||
|
private final MailboxApiCaller mailboxApiCaller =
|
||||||
|
context.mock(MailboxApiCaller.class);
|
||||||
|
private final MailboxApi mailboxApi = context.mock(MailboxApi.class);
|
||||||
|
private final MailboxFileManager mailboxFileManager =
|
||||||
|
context.mock(MailboxFileManager.class);
|
||||||
|
private final Cancellable apiCall =
|
||||||
|
context.mock(Cancellable.class, "apiCall");
|
||||||
|
private final Cancellable wakeupTask =
|
||||||
|
context.mock(Cancellable.class, "wakeupTask");
|
||||||
|
private final Cancellable checkTask =
|
||||||
|
context.mock(Cancellable.class, "checkTask");
|
||||||
|
|
||||||
|
private final MailboxProperties mailboxProperties =
|
||||||
|
getMailboxProperties(false, CLIENT_SUPPORTS);
|
||||||
|
private final long now = System.currentTimeMillis();
|
||||||
|
private final MailboxFolderId folderId = new MailboxFolderId(getRandomId());
|
||||||
|
private final ContactId contactId = getContactId();
|
||||||
|
private final MessageId ackedId = new MessageId(getRandomId());
|
||||||
|
private final MessageId sentId = new MessageId(getRandomId());
|
||||||
|
private final MessageId newMessageId = new MessageId(getRandomId());
|
||||||
|
|
||||||
|
private File testDir, tempFile;
|
||||||
|
private MailboxUploadWorker worker;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
testDir = getTestDirectory();
|
||||||
|
tempFile = new File(testDir, "temp");
|
||||||
|
worker = new MailboxUploadWorker(ioExecutor, db, clock, taskScheduler,
|
||||||
|
eventBus, connectivityChecker, mailboxApiCaller, mailboxApi,
|
||||||
|
mailboxFileManager, mailboxProperties, folderId, contactId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
deleteTestDirectory(testDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChecksForDataWhenStartedAndRemovesObserverWhenDestroyed()
|
||||||
|
throws Exception {
|
||||||
|
// When the worker is started it should check for data to send
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
expectCheckForDataToSendNoDataWaiting();
|
||||||
|
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// observer and event listener
|
||||||
|
expectRemoveObserverAndListener();
|
||||||
|
|
||||||
|
worker.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChecksConnectivityWhenStartedIfDataIsReady()
|
||||||
|
throws Exception {
|
||||||
|
Transaction recordTxn = new Transaction(null, false);
|
||||||
|
|
||||||
|
// When the worker is started it should check for data to send. As
|
||||||
|
// there's data ready to send immediately, the worker should start a
|
||||||
|
// connectivity check
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
expectCheckForDataToSendAndStartConnectivityCheck();
|
||||||
|
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// Create the temporary file so we can test that it gets deleted
|
||||||
|
assertTrue(testDir.mkdirs());
|
||||||
|
assertTrue(tempFile.createNewFile());
|
||||||
|
|
||||||
|
// When the connectivity check succeeds, the worker should write a file
|
||||||
|
// and start an upload task
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
AtomicReference<ApiCall> upload = new AtomicReference<>();
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(mailboxFileManager).createAndWriteTempFileForUpload(
|
||||||
|
with(contactId), with(any(OutgoingSessionRecord.class)));
|
||||||
|
will(new DoAllAction(
|
||||||
|
// Record some IDs as acked and sent
|
||||||
|
new ConsumeArgumentAction<>(OutgoingSessionRecord.class, 1,
|
||||||
|
record -> {
|
||||||
|
record.onAckSent(singletonList(ackedId));
|
||||||
|
record.onMessageSent(sentId);
|
||||||
|
}),
|
||||||
|
returnValue(tempFile)
|
||||||
|
));
|
||||||
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(upload, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
|
}});
|
||||||
|
|
||||||
|
worker.onConnectivityCheckSucceeded();
|
||||||
|
|
||||||
|
// When the upload task runs, it should upload the file, record
|
||||||
|
// the acked/sent messages in the DB, and check for more data to send
|
||||||
|
context.checking(new DbExpectations() {{
|
||||||
|
oneOf(mailboxApi).addFile(mailboxProperties, folderId, tempFile);
|
||||||
|
oneOf(db).transaction(with(false), withDbRunnable(recordTxn));
|
||||||
|
oneOf(db).setAckSent(recordTxn, contactId, singletonList(ackedId));
|
||||||
|
oneOf(db).setMessagesSent(recordTxn, contactId,
|
||||||
|
singletonList(sentId), MAX_LATENCY);
|
||||||
|
}});
|
||||||
|
expectCheckForDataToSendNoDataWaiting();
|
||||||
|
|
||||||
|
assertFalse(upload.get().callApi());
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// observer and event listener
|
||||||
|
expectRemoveObserverAndListener();
|
||||||
|
|
||||||
|
worker.destroy();
|
||||||
|
|
||||||
|
// The file should have been deleted
|
||||||
|
assertFalse(tempFile.exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelsApiCallWhenDestroyed() throws Exception {
|
||||||
|
// When the worker is started it should check for data to send. As
|
||||||
|
// there's data ready to send immediately, the worker should start a
|
||||||
|
// connectivity check
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
expectCheckForDataToSendAndStartConnectivityCheck();
|
||||||
|
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// Create the temporary file so we can test that it gets deleted
|
||||||
|
assertTrue(testDir.mkdirs());
|
||||||
|
assertTrue(tempFile.createNewFile());
|
||||||
|
|
||||||
|
// When the connectivity check succeeds, the worker should write a file
|
||||||
|
// and start an upload task
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
AtomicReference<ApiCall> upload = new AtomicReference<>();
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(mailboxFileManager).createAndWriteTempFileForUpload(
|
||||||
|
with(contactId), with(any(OutgoingSessionRecord.class)));
|
||||||
|
will(new DoAllAction(
|
||||||
|
// Record some IDs as acked and sent
|
||||||
|
new ConsumeArgumentAction<>(OutgoingSessionRecord.class, 1,
|
||||||
|
record -> {
|
||||||
|
record.onAckSent(singletonList(ackedId));
|
||||||
|
record.onMessageSent(sentId);
|
||||||
|
}),
|
||||||
|
returnValue(tempFile)
|
||||||
|
));
|
||||||
|
oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class)));
|
||||||
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(upload, ApiCall.class, 0),
|
||||||
|
returnValue(apiCall)
|
||||||
|
));
|
||||||
|
}});
|
||||||
|
|
||||||
|
worker.onConnectivityCheckSucceeded();
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// observer and event listener and cancel the upload task
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(apiCall).cancel();
|
||||||
|
}});
|
||||||
|
expectRemoveObserverAndListener();
|
||||||
|
|
||||||
|
worker.destroy();
|
||||||
|
|
||||||
|
// The file should have been deleted
|
||||||
|
assertFalse(tempFile.exists());
|
||||||
|
|
||||||
|
// If the upload task runs anyway (cancellation came too late), it
|
||||||
|
// should return early when it finds the state has changed
|
||||||
|
assertFalse(upload.get().callApi());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulesWakeupWhenStartedIfDataIsNotReady()
|
||||||
|
throws Exception {
|
||||||
|
// When the worker is started it should check for data to send. As
|
||||||
|
// the data isn't ready to send immediately, the worker should
|
||||||
|
// schedule a wakeup
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
AtomicReference<Runnable> wakeup = new AtomicReference<>();
|
||||||
|
expectCheckForDataToSendAndScheduleWakeup(wakeup);
|
||||||
|
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// When the wakeup task runs it should check for data to send
|
||||||
|
expectCheckForDataToSendNoDataWaiting();
|
||||||
|
|
||||||
|
wakeup.get().run();
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// observer and event listener
|
||||||
|
expectRemoveObserverAndListener();
|
||||||
|
|
||||||
|
worker.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelsWakeupIfDestroyedBeforeWakingUp() throws Exception {
|
||||||
|
// When the worker is started it should check for data to send. As
|
||||||
|
// the data isn't ready to send immediately, the worker should
|
||||||
|
// schedule a wakeup
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
AtomicReference<Runnable> wakeup = new AtomicReference<>();
|
||||||
|
expectCheckForDataToSendAndScheduleWakeup(wakeup);
|
||||||
|
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// When the worker is destroyed it should cancel the wakeup and
|
||||||
|
// remove the connectivity observer and event listener
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(wakeupTask).cancel();
|
||||||
|
}});
|
||||||
|
expectRemoveObserverAndListener();
|
||||||
|
|
||||||
|
worker.destroy();
|
||||||
|
|
||||||
|
// If the wakeup task runs anyway (cancellation came too late), it
|
||||||
|
// should return early without doing anything
|
||||||
|
wakeup.get().run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelsWakeupIfEventIsReceivedBeforeWakingUp()
|
||||||
|
throws Exception {
|
||||||
|
// When the worker is started it should check for data to send. As
|
||||||
|
// the data isn't ready to send immediately, the worker should
|
||||||
|
// schedule a wakeup
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
AtomicReference<Runnable> wakeup = new AtomicReference<>();
|
||||||
|
expectCheckForDataToSendAndScheduleWakeup(wakeup);
|
||||||
|
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// Before the wakeup task runs, the worker receives an event that
|
||||||
|
// indicates new data may be available. The worker should cancel the
|
||||||
|
// wakeup task and schedule a check for new data after a short delay
|
||||||
|
AtomicReference<Runnable> check = new AtomicReference<>();
|
||||||
|
expectScheduleCheck(check, CHECK_DELAY_MS);
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(wakeupTask).cancel();
|
||||||
|
}});
|
||||||
|
|
||||||
|
worker.eventOccurred(new MessageSharedEvent(newMessageId));
|
||||||
|
|
||||||
|
// If the wakeup task runs anyway (cancellation came too late), it
|
||||||
|
// should return early when it finds the state has changed
|
||||||
|
wakeup.get().run();
|
||||||
|
|
||||||
|
// Before the check task runs, the worker receives another event that
|
||||||
|
// indicates new data may be available. The event should be ignored,
|
||||||
|
// as a check for new data has already been scheduled
|
||||||
|
worker.eventOccurred(new MessageSharedEvent(newMessageId));
|
||||||
|
|
||||||
|
// When the check task runs, it should check for new data
|
||||||
|
expectCheckForDataToSendNoDataWaiting();
|
||||||
|
|
||||||
|
check.get().run();
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// observer and event listener
|
||||||
|
expectRemoveObserverAndListener();
|
||||||
|
|
||||||
|
worker.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelsCheckWhenDestroyed() throws Exception {
|
||||||
|
// When the worker is started it should check for data to send
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
expectCheckForDataToSendNoDataWaiting();
|
||||||
|
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// The worker receives an event that indicates new data may be
|
||||||
|
// available. The worker should schedule a check for new data after
|
||||||
|
// a short delay
|
||||||
|
AtomicReference<Runnable> check = new AtomicReference<>();
|
||||||
|
expectScheduleCheck(check, CHECK_DELAY_MS);
|
||||||
|
|
||||||
|
worker.eventOccurred(new MessageSharedEvent(newMessageId));
|
||||||
|
|
||||||
|
// When the worker is destroyed it should cancel the check and
|
||||||
|
// remove the connectivity observer and event listener
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(checkTask).cancel();
|
||||||
|
}});
|
||||||
|
expectRemoveObserverAndListener();
|
||||||
|
|
||||||
|
worker.destroy();
|
||||||
|
|
||||||
|
// If the check runs anyway (cancellation came too late), it should
|
||||||
|
// return early when it finds the state has changed
|
||||||
|
check.get().run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetriesAfterDelayIfExceptionOccursWhileWritingFile()
|
||||||
|
throws Exception {
|
||||||
|
// When the worker is started it should check for data to send. As
|
||||||
|
// there's data ready to send immediately, the worker should start a
|
||||||
|
// connectivity check
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
expectCheckForDataToSendAndStartConnectivityCheck();
|
||||||
|
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
// When the connectivity check succeeds, the worker should try to
|
||||||
|
// write a file. This fails with an exception, so the worker should
|
||||||
|
// retry by scheduling a check for new data after a short delay
|
||||||
|
expectRunTaskOnIoExecutor();
|
||||||
|
AtomicReference<Runnable> check = new AtomicReference<>();
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(mailboxFileManager).createAndWriteTempFileForUpload(
|
||||||
|
with(contactId), with(any(OutgoingSessionRecord.class)));
|
||||||
|
will(throwException(new IOException())); // Oh noes!
|
||||||
|
}});
|
||||||
|
expectScheduleCheck(check, RETRY_DELAY_MS);
|
||||||
|
|
||||||
|
worker.onConnectivityCheckSucceeded();
|
||||||
|
|
||||||
|
// When the check task runs it should check for new data
|
||||||
|
expectCheckForDataToSendNoDataWaiting();
|
||||||
|
|
||||||
|
check.get().run();
|
||||||
|
|
||||||
|
// When the worker is destroyed it should remove the connectivity
|
||||||
|
// observer and event listener
|
||||||
|
expectRemoveObserverAndListener();
|
||||||
|
|
||||||
|
worker.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expectRunTaskOnIoExecutor() {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(ioExecutor).execute(with(any(Runnable.class)));
|
||||||
|
will(new RunAction());
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expectCheckForDataToSendNoDataWaiting() throws Exception {
|
||||||
|
Transaction txn = new Transaction(null, true);
|
||||||
|
|
||||||
|
context.checking(new DbExpectations() {{
|
||||||
|
oneOf(db).transaction(with(true), withDbRunnable(txn));
|
||||||
|
oneOf(db).containsAcksToSend(txn, contactId);
|
||||||
|
will(returnValue(false));
|
||||||
|
oneOf(db).getNextSendTime(txn, contactId, MAX_LATENCY);
|
||||||
|
will(returnValue(Long.MAX_VALUE)); // No data waiting
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expectCheckForDataToSendAndScheduleWakeup(
|
||||||
|
AtomicReference<Runnable> wakeup) throws Exception {
|
||||||
|
Transaction txn = new Transaction(null, true);
|
||||||
|
|
||||||
|
context.checking(new DbExpectations() {{
|
||||||
|
oneOf(db).transaction(with(true), withDbRunnable(txn));
|
||||||
|
oneOf(db).containsAcksToSend(txn, contactId);
|
||||||
|
will(returnValue(false));
|
||||||
|
oneOf(db).getNextSendTime(txn, contactId, MAX_LATENCY);
|
||||||
|
will(returnValue(now + 1234L)); // Data waiting but not ready
|
||||||
|
oneOf(clock).currentTimeMillis();
|
||||||
|
will(returnValue(now));
|
||||||
|
oneOf(taskScheduler).schedule(with(any(Runnable.class)),
|
||||||
|
with(ioExecutor), with(1234L), with(MILLISECONDS));
|
||||||
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(wakeup, Runnable.class, 0),
|
||||||
|
returnValue(wakeupTask)
|
||||||
|
));
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expectCheckForDataToSendAndStartConnectivityCheck()
|
||||||
|
throws Exception {
|
||||||
|
Transaction txn = new Transaction(null, true);
|
||||||
|
|
||||||
|
context.checking(new DbExpectations() {{
|
||||||
|
oneOf(db).transaction(with(true), withDbRunnable(txn));
|
||||||
|
oneOf(db).containsAcksToSend(txn, contactId);
|
||||||
|
will(returnValue(false));
|
||||||
|
oneOf(db).getNextSendTime(txn, contactId, MAX_LATENCY);
|
||||||
|
will(returnValue(0L)); // Data ready to send
|
||||||
|
oneOf(clock).currentTimeMillis();
|
||||||
|
will(returnValue(now));
|
||||||
|
oneOf(connectivityChecker).checkConnectivity(mailboxProperties,
|
||||||
|
worker);
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expectScheduleCheck(AtomicReference<Runnable> check,
|
||||||
|
long delay) {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(taskScheduler).schedule(with(any(Runnable.class)),
|
||||||
|
with(ioExecutor), with(delay), with(MILLISECONDS));
|
||||||
|
will(new DoAllAction(
|
||||||
|
new CaptureArgumentAction<>(check, Runnable.class, 0),
|
||||||
|
returnValue(checkTask)
|
||||||
|
));
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expectRemoveObserverAndListener() {
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(connectivityChecker).removeObserver(worker);
|
||||||
|
oneOf(eventBus).removeListener(worker);
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
package org.briarproject.bramble.test;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.Consumer;
|
||||||
|
import org.hamcrest.Description;
|
||||||
|
import org.jmock.api.Action;
|
||||||
|
import org.jmock.api.Invocation;
|
||||||
|
|
||||||
|
public class ConsumeArgumentAction<T> implements Action {
|
||||||
|
|
||||||
|
private final Class<T> capturedClass;
|
||||||
|
private final int index;
|
||||||
|
private final Consumer<T> consumer;
|
||||||
|
|
||||||
|
public ConsumeArgumentAction(Class<T> capturedClass, int index,
|
||||||
|
Consumer<T> consumer) {
|
||||||
|
this.capturedClass = capturedClass;
|
||||||
|
this.index = index;
|
||||||
|
this.consumer = consumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object invoke(Invocation invocation) {
|
||||||
|
consumer.accept(capturedClass.cast(invocation.getParameter(index)));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void describeTo(Description description) {
|
||||||
|
description.appendText("passes an argument to a consumer");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user