diff --git a/bramble-api/src/main/java/org/briarproject/bramble/util/IoUtils.java b/bramble-api/src/main/java/org/briarproject/bramble/util/IoUtils.java index d94f289f9..3c07f76c3 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/util/IoUtils.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/util/IoUtils.java @@ -40,7 +40,7 @@ public class IoUtils { } } - private static void delete(File f) { + public static void delete(File f) { if (!f.delete() && LOG.isLoggable(WARNING)) LOG.warning("Could not delete " + f.getAbsolutePath()); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorker.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorker.java index 70e4266f6..2994dbbc4 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorker.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorker.java @@ -146,10 +146,12 @@ class ContactMailboxDownloadWorker implements MailboxWorker, if (state == State.DOWNLOAD_CYCLE_1) { LOG.info("First download cycle finished"); state = State.WAITING_FOR_TOR; + apiCall = null; addObserver = true; } else if (state == State.DOWNLOAD_CYCLE_2) { LOG.info("Second download cycle finished"); state = State.FINISHED; + apiCall = null; } } if (addObserver) { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxFileManager.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxFileManager.java index 0320b1bd5..91553a7d6 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxFileManager.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxFileManager.java @@ -1,6 +1,8 @@ package org.briarproject.bramble.mailbox; +import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import java.io.File; import java.io.IOException; @@ -16,6 +18,14 @@ interface MailboxFileManager { */ File createTempFileForDownload() throws IOException; + /** + * Creates a file to be uploaded to the given contact and writes any + * waiting data to the file. The IDs of any messages sent or acked will + * be added to the given {@link OutgoingSessionRecord}. + */ + File createAndWriteTempFileForUpload(ContactId contactId, + OutgoingSessionRecord sessionRecord) throws IOException; + /** * Handles a file that has been downloaded. The file should be created * with {@link #createTempFileForDownload()}. diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxFileManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxFileManagerImpl.java index b3d84837c..82c7c47cb 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxFileManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxFileManagerImpl.java @@ -1,6 +1,7 @@ package org.briarproject.bramble.mailbox; import org.briarproject.bramble.api.connection.ConnectionManager; +import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.event.Event; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventListener; @@ -10,13 +11,18 @@ import org.briarproject.bramble.api.mailbox.MailboxDirectory; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.event.TransportActiveEvent; import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; import org.briarproject.bramble.api.properties.TransportProperties; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.logging.Logger; @@ -30,6 +36,7 @@ import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleS import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID; import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH; +import static org.briarproject.bramble.util.IoUtils.delete; import static org.briarproject.bramble.util.LogUtils.logException; @ThreadSafe @@ -41,6 +48,7 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener { // Package access for testing static final String DOWNLOAD_DIR_NAME = "downloads"; + static final String UPLOAD_DIR_NAME = "uploads"; private final Executor ioExecutor; private final PluginManager pluginManager; @@ -67,14 +75,44 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener { @Override public File createTempFileForDownload() throws IOException { + return createTempFile(DOWNLOAD_DIR_NAME); + } + + @Override + public File createAndWriteTempFileForUpload(ContactId contactId, + OutgoingSessionRecord sessionRecord) throws IOException { + File f = createTempFile(UPLOAD_DIR_NAME); + // We shouldn't reach this point until the plugin has been started + SimplexPlugin plugin = + (SimplexPlugin) requireNonNull(pluginManager.getPlugin(ID)); + TransportProperties p = new TransportProperties(); + p.put(PROP_PATH, f.getAbsolutePath()); + TransportConnectionWriter writer = plugin.createWriter(p); + if (writer == null) { + delete(f); + throw new IOException(); + } + MailboxFileWriter decorated = new MailboxFileWriter(writer); + LOG.info("Writing file for upload"); + connectionManager.manageOutgoingConnection(contactId, ID, decorated, + sessionRecord); + if (decorated.awaitDisposal()) { + // An exception was thrown during the session - delete the file + delete(f); + throw new IOException(); + } + return f; + } + + private File createTempFile(String dirName) throws IOException { // Wait for orphaned files to be handled before creating new files try { orphanLatch.await(); } catch (InterruptedException e) { throw new IOException(e); } - File downloadDir = createDirectoryIfNeeded(DOWNLOAD_DIR_NAME); - return File.createTempFile("mailbox", ".tmp", downloadDir); + File dir = createDirectoryIfNeeded(dirName); + return File.createTempFile("mailbox", ".tmp", dir); } private File createDirectoryIfNeeded(String name) throws IOException { @@ -116,6 +154,8 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener { @Override public void eventOccurred(Event e) { + // Wait for the transport to become active before handling orphaned + // files so that we can get the plugin from the plugin manager if (e instanceof TransportActiveEvent) { TransportActiveEvent t = (TransportActiveEvent) e; if (t.getTransportId().equals(ID)) { @@ -127,17 +167,25 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener { /** * This method is called at startup, as soon as the plugin is started, to - * handle any files that were left in the download directory at the last - * shutdown. + * delete any files that were left in the upload directory at the last + * shutdown and handle any files that were left in the download directory. */ @IoExecutor private void handleOrphanedFiles() { try { + File uploadDir = createDirectoryIfNeeded(UPLOAD_DIR_NAME); + File[] orphanedUploads = uploadDir.listFiles(); + if (orphanedUploads != null) { + for (File f : orphanedUploads) delete(f); + } File downloadDir = createDirectoryIfNeeded(DOWNLOAD_DIR_NAME); - File[] orphans = downloadDir.listFiles(); - // Now that we've got the list of orphans, new files can be created + File[] orphanedDownloads = downloadDir.listFiles(); + // Now that we've got the list of orphaned downloads, new files + // can be created in the download directory orphanLatch.countDown(); - if (orphans != null) for (File f : orphans) handleDownloadedFile(f); + if (orphanedDownloads != null) { + for (File f : orphanedDownloads) handleDownloadedFile(f); + } } catch (IOException e) { logException(LOG, WARNING, e); } @@ -165,9 +213,58 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener { delegate.dispose(exception, recognised); if (isHandlingComplete(exception, recognised)) { LOG.info("Deleting downloaded file"); - if (!file.delete()) { - LOG.warning("Failed to delete downloaded file"); - } + delete(file); + } + } + } + + private static class MailboxFileWriter + implements TransportConnectionWriter { + + private final TransportConnectionWriter delegate; + private final BlockingQueue disposalResult = + new ArrayBlockingQueue<>(1); + + private MailboxFileWriter(TransportConnectionWriter delegate) { + this.delegate = delegate; + } + + @Override + public long getMaxLatency() { + return delegate.getMaxLatency(); + } + + @Override + public int getMaxIdleTime() { + return delegate.getMaxIdleTime(); + } + + @Override + public boolean isLossyAndCheap() { + return delegate.isLossyAndCheap(); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return delegate.getOutputStream(); + } + + @Override + public void dispose(boolean exception) throws IOException { + delegate.dispose(exception); + disposalResult.add(exception); + } + + /** + * Waits for the delegate to be disposed and returns true if an + * exception occurred. + */ + private boolean awaitDisposal() { + try { + return disposalResult.take(); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for disposal"); + return true; } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxUploadWorker.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxUploadWorker.java new file mode 100644 index 000000000..e0f3496a1 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxUploadWorker.java @@ -0,0 +1,394 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.Cancellable; +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.event.EventExecutor; +import org.briarproject.bramble.api.event.EventListener; +import org.briarproject.bramble.api.lifecycle.IoExecutor; +import org.briarproject.bramble.api.mailbox.MailboxFolderId; +import org.briarproject.bramble.api.mailbox.MailboxProperties; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; +import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent; +import org.briarproject.bramble.api.sync.event.MessageSharedEvent; +import org.briarproject.bramble.api.sync.event.MessageToAckEvent; +import org.briarproject.bramble.api.system.Clock; +import org.briarproject.bramble.api.system.TaskScheduler; +import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver; +import org.briarproject.bramble.mailbox.MailboxApi.ApiException; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.Executor; +import java.util.logging.Logger; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY; +import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; +import static org.briarproject.bramble.util.IoUtils.delete; +import static org.briarproject.bramble.util.LogUtils.logException; + +@ThreadSafe +@NotNullByDefault +class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver, + EventListener { + + /** + * When the worker is started it checks for data to send. If data is ready + * to send, the worker waits for a connectivity check, then writes and + * uploads a file and checks again for data to send. + *

+ * If data is due to be sent at some time in the future, the worker + * schedules a wakeup for that time and also listens for events indicating + * that new data may be ready to send. + *

+ * If there's no data to send, the worker listens for events indicating + * that new data may be ready to send. + */ + private enum State { + CREATED, + CHECKING_FOR_DATA, + WAITING_FOR_DATA, + CONNECTIVITY_CHECK, + WRITING_UPLOADING, + DESTROYED + } + + private static final Logger LOG = + getLogger(MailboxUploadWorker.class.getName()); + + /** + * When we're waiting for data to send and an event indicates that new data + * may have become available, wait this long before checking the DB. This + * should help to avoid creating lots of small files when several acks or + * messages become available to send in a short period (eg when reading a + * file downloaded from a mailbox). + *

+ * Package access for testing. + */ + static final long CHECK_DELAY_MS = 5_000; + + /** + * How long to wait before retrying when an exception occurs while writing + * a file. + *

+ * Package access for testing. + */ + static final long RETRY_DELAY_MS = MINUTES.toMillis(1); + + private final Executor ioExecutor; + private final DatabaseComponent db; + private final Clock clock; + private final TaskScheduler taskScheduler; + private final EventBus eventBus; + private final ConnectivityChecker connectivityChecker; + private final MailboxApiCaller mailboxApiCaller; + private final MailboxApi mailboxApi; + private final MailboxFileManager mailboxFileManager; + private final MailboxProperties mailboxProperties; + private final MailboxFolderId folderId; + private final ContactId contactId; + + private final Object lock = new Object(); + + @GuardedBy("lock") + private State state = State.CREATED; + + @GuardedBy("lock") + @Nullable + private Cancellable wakeupTask = null, checkTask = null, apiCall = null; + + @GuardedBy("lock") + @Nullable + private File file = null; + + MailboxUploadWorker(@IoExecutor Executor ioExecutor, + DatabaseComponent db, + Clock clock, + TaskScheduler taskScheduler, + EventBus eventBus, + ConnectivityChecker connectivityChecker, + MailboxApiCaller mailboxApiCaller, + MailboxApi mailboxApi, + MailboxFileManager mailboxFileManager, + MailboxProperties mailboxProperties, + MailboxFolderId folderId, + ContactId contactId) { + this.ioExecutor = ioExecutor; + this.db = db; + this.clock = clock; + this.taskScheduler = taskScheduler; + this.eventBus = eventBus; + this.connectivityChecker = connectivityChecker; + this.mailboxApiCaller = mailboxApiCaller; + this.mailboxApi = mailboxApi; + this.mailboxFileManager = mailboxFileManager; + this.mailboxProperties = mailboxProperties; + this.folderId = folderId; + this.contactId = contactId; + } + + @Override + public void start() { + LOG.info("Started"); + synchronized (lock) { + // Don't allow the worker to be reused + if (state != State.CREATED) throw new IllegalStateException(); + state = State.CHECKING_FOR_DATA; + } + ioExecutor.execute(this::checkForDataToSend); + } + + @Override + public void destroy() { + LOG.info("Destroyed"); + Cancellable wakeupTask, checkTask, apiCall; + File file; + synchronized (lock) { + state = State.DESTROYED; + wakeupTask = this.wakeupTask; + this.wakeupTask = null; + checkTask = this.checkTask; + this.checkTask = null; + apiCall = this.apiCall; + this.apiCall = null; + file = this.file; + this.file = null; + } + if (wakeupTask != null) wakeupTask.cancel(); + if (checkTask != null) checkTask.cancel(); + if (apiCall != null) apiCall.cancel(); + if (file != null) delete(file); + connectivityChecker.removeObserver(this); + eventBus.removeListener(this); + } + + @IoExecutor + private void checkForDataToSend() { + synchronized (lock) { + checkTask = null; + if (state != State.CHECKING_FOR_DATA) return; + } + LOG.info("Checking for data to send"); + try { + db.transaction(true, txn -> { + long nextSendTime; + if (db.containsAcksToSend(txn, contactId)) { + nextSendTime = 0L; + } else { + nextSendTime = db.getNextSendTime(txn, contactId, + MAX_LATENCY); + } + // Handle the result on the event executor to avoid races with + // incoming events + txn.attach(() -> handleNextSendTime(nextSendTime)); + }); + } catch (DbException e) { + logException(LOG, WARNING, e); + } + } + + @EventExecutor + private void handleNextSendTime(long nextSendTime) { + if (nextSendTime == Long.MAX_VALUE) { + // Nothing is sendable now or due to be sent in the future. Wait + // for an event indicating that new data may be ready to send + waitForDataToSend(); + } else { + // Work out the delay until data's ready to send (may be negative) + long delay = nextSendTime - clock.currentTimeMillis(); + if (delay > 0) { + // Schedule a wakeup when data will be ready to send. If an + // event is received in the meantime indicating that new data + // may be ready to send, we'll cancel the wakeup + scheduleWakeup(delay); + } else { + // Data is ready to send now + checkConnectivity(); + } + } + } + + @EventExecutor + private void waitForDataToSend() { + synchronized (lock) { + if (state != State.CHECKING_FOR_DATA) return; + state = State.WAITING_FOR_DATA; + LOG.info("Waiting for data to send"); + } + } + + @EventExecutor + private void scheduleWakeup(long delay) { + synchronized (lock) { + if (state != State.CHECKING_FOR_DATA) return; + state = State.WAITING_FOR_DATA; + if (LOG.isLoggable(INFO)) { + LOG.info("Scheduling wakeup in " + delay + " ms"); + } + wakeupTask = taskScheduler.schedule(this::wakeUp, ioExecutor, + delay, MILLISECONDS); + } + } + + @IoExecutor + private void wakeUp() { + LOG.info("Woke up"); + synchronized (lock) { + wakeupTask = null; + if (state != State.WAITING_FOR_DATA) return; + state = State.CHECKING_FOR_DATA; + } + checkForDataToSend(); + } + + @EventExecutor + private void checkConnectivity() { + synchronized (lock) { + if (state != State.CHECKING_FOR_DATA) return; + state = State.CONNECTIVITY_CHECK; + } + LOG.info("Checking connectivity"); + // Avoid leaking observer in case destroy() is called concurrently + // before observer is added + connectivityChecker.checkConnectivity(mailboxProperties, this); + boolean destroyed; + synchronized (lock) { + destroyed = state == State.DESTROYED; + } + if (destroyed) connectivityChecker.removeObserver(this); + } + + @Override + public void onConnectivityCheckSucceeded() { + LOG.info("Connectivity check succeeded"); + synchronized (lock) { + if (state != State.CONNECTIVITY_CHECK) return; + state = State.WRITING_UPLOADING; + } + ioExecutor.execute(this::writeAndUploadFile); + } + + @IoExecutor + private void writeAndUploadFile() { + synchronized (lock) { + if (state != State.WRITING_UPLOADING) return; + } + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); + File file; + try { + file = mailboxFileManager.createAndWriteTempFileForUpload( + contactId, sessionRecord); + } catch (IOException e) { + logException(LOG, WARNING, e); + // Try again after a delay + synchronized (lock) { + if (state != State.WRITING_UPLOADING) return; + state = State.CHECKING_FOR_DATA; + checkTask = taskScheduler.schedule(this::checkForDataToSend, + ioExecutor, RETRY_DELAY_MS, MILLISECONDS); + } + return; + } + boolean deleteFile = false; + synchronized (lock) { + if (state == State.WRITING_UPLOADING) { + this.file = file; + apiCall = mailboxApiCaller.retryWithBackoff( + new SimpleApiCall(() -> apiCallUploadFile(file, + sessionRecord))); + } else { + deleteFile = true; + } + } + if (deleteFile) delete(file); + } + + @IoExecutor + private void apiCallUploadFile(File file, + OutgoingSessionRecord sessionRecord) + throws IOException, ApiException { + synchronized (lock) { + if (state != State.WRITING_UPLOADING) return; + } + LOG.info("Uploading file"); + mailboxApi.addFile(mailboxProperties, folderId, file); + markMessagesSentOrAcked(sessionRecord); + synchronized (lock) { + if (state != State.WRITING_UPLOADING) return; + state = State.CHECKING_FOR_DATA; + apiCall = null; + this.file = null; + } + delete(file); + checkForDataToSend(); + } + + private void markMessagesSentOrAcked(OutgoingSessionRecord sessionRecord) { + Collection acked = sessionRecord.getAckedIds(); + Collection sent = sessionRecord.getSentIds(); + try { + db.transaction(false, txn -> { + if (!acked.isEmpty()) { + db.setAckSent(txn, contactId, acked); + } + if (!sent.isEmpty()) { + db.setMessagesSent(txn, contactId, sent, MAX_LATENCY); + } + }); + } catch (DbException e) { + logException(LOG, WARNING, e); + } + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof MessageToAckEvent) { + MessageToAckEvent m = (MessageToAckEvent) e; + if (m.getContactId().equals(contactId)) { + LOG.info("Message to ack"); + onDataToSend(); + } + } else if (e instanceof MessageSharedEvent) { + LOG.info("Message shared"); + onDataToSend(); + } else if (e instanceof GroupVisibilityUpdatedEvent) { + GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e; + if (g.getVisibility() == SHARED && + g.getAffectedContacts().contains(contactId)) { + LOG.info("Group shared"); + onDataToSend(); + } + } + } + + @EventExecutor + private void onDataToSend() { + Cancellable wakeupTask; + synchronized (lock) { + if (state != State.WAITING_FOR_DATA) return; + state = State.CHECKING_FOR_DATA; + wakeupTask = this.wakeupTask; + this.wakeupTask = null; + // Delay the check to avoid creating lots of small files + checkTask = taskScheduler.schedule(this::checkForDataToSend, + ioExecutor, CHECK_DELAY_MS, MILLISECONDS); + } + // If we had scheduled a wakeup when data was due to be sent, cancel it + if (wakeupTask != null) wakeupTask.cancel(); + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorkerTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorkerTest.java index 273c4f016..32ac944a0 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorkerTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/ContactMailboxDownloadWorkerTest.java @@ -1,5 +1,6 @@ package org.briarproject.bramble.mailbox; +import org.briarproject.bramble.api.Cancellable; import org.briarproject.bramble.api.mailbox.MailboxFileId; import org.briarproject.bramble.api.mailbox.MailboxProperties; import org.briarproject.bramble.mailbox.MailboxApi.MailboxFile; @@ -7,6 +8,7 @@ import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException; import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.CaptureArgumentAction; import org.jmock.Expectations; +import org.jmock.lib.action.DoAllAction; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -36,6 +38,7 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase { private final MailboxApi mailboxApi = context.mock(MailboxApi.class); private final MailboxFileManager mailboxFileManager = context.mock(MailboxFileManager.class); + private final Cancellable apiCall = context.mock(Cancellable.class); private final MailboxProperties mailboxProperties = getMailboxProperties(false, CLIENT_SUPPORTS); @@ -96,30 +99,36 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase { // When the connectivity check succeeds, a list-inbox task should be // started for the first download cycle - AtomicReference listTask = new AtomicReference<>(null); + AtomicReference listTask = new AtomicReference<>(); context.checking(new Expectations() {{ oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); - will(new CaptureArgumentAction<>(listTask, ApiCall.class, 0)); + will(new DoAllAction( + new CaptureArgumentAction<>(listTask, ApiCall.class, 0), + returnValue(apiCall) + )); }}); worker.onConnectivityCheckSucceeded(); // When the list-inbox tasks runs and finds some files to download, // it should start a download task for the first file - AtomicReference downloadTask = new AtomicReference<>(null); + AtomicReference downloadTask = new AtomicReference<>(); context.checking(new Expectations() {{ oneOf(mailboxApi).getFiles(mailboxProperties, requireNonNull(mailboxProperties.getInboxId())); will(returnValue(files)); oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); - will(new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0)); + will(new DoAllAction( + new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0), + returnValue(apiCall) + )); }}); assertFalse(listTask.get().callApi()); // When the first download task runs it should download the file to the // location provided by the file manager and start a delete task - AtomicReference deleteTask = new AtomicReference<>(null); + AtomicReference deleteTask = new AtomicReference<>(); context.checking(new Expectations() {{ oneOf(mailboxFileManager).createTempFileForDownload(); will(returnValue(tempFile)); @@ -128,7 +137,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase { file1.name, tempFile); oneOf(mailboxFileManager).handleDownloadedFile(tempFile); oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); - will(new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0)); + will(new DoAllAction( + new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0), + returnValue(apiCall) + )); }}); assertFalse(downloadTask.get().callApi()); @@ -140,7 +152,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase { requireNonNull(mailboxProperties.getInboxId()), file1.name); will(throwException(new TolerableFailureException())); oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); - will(new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0)); + will(new DoAllAction( + new CaptureArgumentAction<>(downloadTask, ApiCall.class, 0), + returnValue(apiCall) + )); }}); assertFalse(deleteTask.get().callApi()); @@ -155,7 +170,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase { file2.name, tempFile); oneOf(mailboxFileManager).handleDownloadedFile(tempFile); oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); - will(new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0)); + will(new DoAllAction( + new CaptureArgumentAction<>(deleteTask, ApiCall.class, 0), + returnValue(apiCall) + )); }}); assertFalse(downloadTask.get().callApi()); @@ -168,7 +186,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase { requireNonNull(mailboxProperties.getInboxId()), file2.name); will(throwException(new TolerableFailureException())); oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); - will(new CaptureArgumentAction<>(listTask, ApiCall.class, 0)); + will(new DoAllAction( + new CaptureArgumentAction<>(listTask, ApiCall.class, 0), + returnValue(apiCall) + )); }}); assertFalse(deleteTask.get().callApi()); @@ -188,7 +209,10 @@ public class ContactMailboxDownloadWorkerTest extends BrambleMockTestCase { // be started for the second download cycle context.checking(new Expectations() {{ oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); - will(new CaptureArgumentAction<>(listTask, ApiCall.class, 0)); + will(new DoAllAction( + new CaptureArgumentAction<>(listTask, ApiCall.class, 0), + returnValue(apiCall) + )); }}); worker.onTorReachable(); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxFileManagerImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxFileManagerImplTest.java index c67ca2dd4..12b5c13ec 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxFileManagerImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxFileManagerImplTest.java @@ -2,16 +2,20 @@ package org.briarproject.bramble.mailbox; import org.briarproject.bramble.api.connection.ConnectionManager; import org.briarproject.bramble.api.connection.ConnectionManager.TagController; +import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState; import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.event.TransportActiveEvent; import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; import org.briarproject.bramble.api.properties.TransportProperties; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.CaptureArgumentAction; +import org.briarproject.bramble.test.ConsumeArgumentAction; import org.briarproject.bramble.test.RunAction; import org.jmock.Expectations; import org.jmock.lib.action.DoAllAction; @@ -20,6 +24,7 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.io.IOException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; @@ -28,11 +33,14 @@ import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleS import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID; import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH; import static org.briarproject.bramble.mailbox.MailboxFileManagerImpl.DOWNLOAD_DIR_NAME; +import static org.briarproject.bramble.mailbox.MailboxFileManagerImpl.UPLOAD_DIR_NAME; import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory; +import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getTestDirectory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class MailboxFileManagerImplTest extends BrambleMockTestCase { @@ -47,6 +55,10 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase { private final SimplexPlugin plugin = context.mock(SimplexPlugin.class); private final TransportConnectionReader transportConnectionReader = context.mock(TransportConnectionReader.class); + private final TransportConnectionWriter transportConnectionWriter = + context.mock(TransportConnectionWriter.class); + + private final ContactId contactId = getContactId(); private File mailboxDir; private MailboxFileManagerImpl manager; @@ -65,17 +77,25 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase { @Test public void testHandlesOrphanedFilesAtStartup() throws Exception { - // Create an orphaned file, left behind at the previous shutdown + // Create an orphaned upload, left behind at the previous shutdown + File uploadDir = new File(mailboxDir, UPLOAD_DIR_NAME); + //noinspection ResultOfMethodCallIgnored + uploadDir.mkdirs(); + File orphanedUpload = new File(uploadDir, "orphan"); + assertTrue(orphanedUpload.createNewFile()); + + // Create an orphaned download, left behind at the previous shutdown File downloadDir = new File(mailboxDir, DOWNLOAD_DIR_NAME); //noinspection ResultOfMethodCallIgnored downloadDir.mkdirs(); - File orphan = new File(downloadDir, "orphan"); - assertTrue(orphan.createNewFile()); + File orphanedDownload = new File(downloadDir, "orphan"); + assertTrue(orphanedDownload.createNewFile()); TransportProperties props = new TransportProperties(); - props.put(PROP_PATH, orphan.getAbsolutePath()); + props.put(PROP_PATH, orphanedDownload.getAbsolutePath()); - // When the plugin becomes active the orphaned file should be handled + // When the plugin becomes active the orphaned upload should be deleted + // and the orphaned download should be handled context.checking(new Expectations() {{ oneOf(ioExecutor).execute(with(any(Runnable.class))); will(new RunAction()); @@ -90,10 +110,12 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase { }}); manager.eventOccurred(new TransportActiveEvent(ID)); + + assertFalse(orphanedUpload.exists()); } @Test - public void testDeletesFileWhenReadSucceeds() throws Exception { + public void testDeletesDownloadedFileWhenReadSucceeds() throws Exception { expectCheckForOrphans(); manager.eventOccurred(new TransportActiveEvent(ID)); @@ -102,7 +124,7 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase { new AtomicReference<>(null); AtomicReference controller = new AtomicReference<>(null); - expectPassFileToConnectionManager(f, reader, controller); + expectPassDownloadedFileToConnectionManager(f, reader, controller); manager.handleDownloadedFile(f); // The read is successful, so the tag controller should allow the tag @@ -117,29 +139,117 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase { } @Test - public void testDeletesFileWhenTagIsNotRecognised() throws Exception { - testDeletesFile(false, RUNNING, false); - } - - @Test - public void testDeletesFileWhenReadFails() throws Exception { - testDeletesFile(true, RUNNING, false); - } - - @Test - public void testDoesNotDeleteFileWhenTagIsNotRecognisedAtShutdown() + public void testDeletesDownloadedFileWhenTagIsNotRecognised() throws Exception { - testDeletesFile(false, STOPPING, true); + testDeletesDownloadedFile(false, RUNNING, false); } @Test - public void testDoesNotDeleteFileWhenReadFailsAtShutdown() - throws Exception { - testDeletesFile(true, STOPPING, true); + public void testDeletesDownloadedFileWhenReadFails() throws Exception { + testDeletesDownloadedFile(true, RUNNING, false); } - private void testDeletesFile(boolean recognised, LifecycleState state, - boolean fileExists) throws Exception { + @Test + public void testDoesNotDeleteDownloadedFileWhenTagIsNotRecognisedAtShutdown() + throws Exception { + testDeletesDownloadedFile(false, STOPPING, true); + } + + @Test + public void testDoesNotDeleteDownloadedFileWhenReadFailsAtShutdown() + throws Exception { + testDeletesDownloadedFile(true, STOPPING, true); + } + + @Test(expected = IOException.class) + public void testThrowsExceptionIfPluginFailsToCreateWriter() + throws Exception { + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); + + expectCheckForOrphans(); + manager.eventOccurred(new TransportActiveEvent(ID)); + + context.checking(new Expectations() {{ + oneOf(pluginManager).getPlugin(ID); + will(returnValue(plugin)); + oneOf(plugin).createWriter(with(any(TransportProperties.class))); + will(returnValue(null)); + }}); + + manager.createAndWriteTempFileForUpload(contactId, sessionRecord); + } + + @Test(expected = IOException.class) + public void testThrowsExceptionIfSessionFailsWithException() + throws Exception { + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); + + expectCheckForOrphans(); + manager.eventOccurred(new TransportActiveEvent(ID)); + + context.checking(new Expectations() {{ + oneOf(pluginManager).getPlugin(ID); + will(returnValue(plugin)); + oneOf(plugin).createWriter(with(any(TransportProperties.class))); + will(returnValue(transportConnectionWriter)); + oneOf(transportConnectionWriter).dispose(true); + oneOf(connectionManager).manageOutgoingConnection(with(contactId), + with(ID), with(any(TransportConnectionWriter.class)), + with(sessionRecord)); + // The session fails with an exception. We need to use an action + // for this, as createAndWriteTempFileForUpload() waits for it to + // happen before returning + will(new ConsumeArgumentAction<>(TransportConnectionWriter.class, 2, + writer -> { + try { + writer.dispose(true); + } catch (IOException e) { + fail(); + } + } + )); + }}); + + manager.createAndWriteTempFileForUpload(contactId, sessionRecord); + } + + @Test + public void testReturnsFileIfSessionSucceeds() throws Exception { + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); + + expectCheckForOrphans(); + manager.eventOccurred(new TransportActiveEvent(ID)); + + context.checking(new Expectations() {{ + oneOf(pluginManager).getPlugin(ID); + will(returnValue(plugin)); + oneOf(plugin).createWriter(with(any(TransportProperties.class))); + will(returnValue(transportConnectionWriter)); + oneOf(transportConnectionWriter).dispose(false); + oneOf(connectionManager).manageOutgoingConnection(with(contactId), + with(ID), with(any(TransportConnectionWriter.class)), + with(sessionRecord)); + // The session succeeds. We need to use an action for this, as + // createAndWriteTempFileForUpload() waits for it to happen before + // returning + will(new ConsumeArgumentAction<>(TransportConnectionWriter.class, 2, + writer -> { + try { + writer.dispose(false); + } catch (IOException e) { + fail(); + } + } + )); + }}); + + File f = manager.createAndWriteTempFileForUpload(contactId, + sessionRecord); + assertTrue(f.exists()); + } + + private void testDeletesDownloadedFile(boolean recognised, + LifecycleState state, boolean fileExists) throws Exception { expectCheckForOrphans(); manager.eventOccurred(new TransportActiveEvent(ID)); @@ -148,7 +258,7 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase { new AtomicReference<>(null); AtomicReference controller = new AtomicReference<>(null); - expectPassFileToConnectionManager(f, reader, controller); + expectPassDownloadedFileToConnectionManager(f, reader, controller); manager.handleDownloadedFile(f); context.checking(new Expectations() {{ @@ -169,7 +279,7 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase { }}); } - private void expectPassFileToConnectionManager(File f, + private void expectPassDownloadedFileToConnectionManager(File f, AtomicReference reader, AtomicReference controller) { TransportProperties props = new TransportProperties(); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxPairingTaskImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxPairingTaskImplTest.java index 46117fb42..abb5e0ddc 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxPairingTaskImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxPairingTaskImplTest.java @@ -13,7 +13,6 @@ import org.briarproject.bramble.api.mailbox.MailboxSettingsManager; import org.briarproject.bramble.api.mailbox.MailboxUpdate; import org.briarproject.bramble.api.mailbox.MailboxUpdateManager; import org.briarproject.bramble.api.mailbox.MailboxVersion; -import org.briarproject.bramble.api.mailbox.event.OwnMailboxConnectionStatusEvent; import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.DbExpectations; @@ -33,7 +32,6 @@ import static java.util.Collections.singletonList; import static org.briarproject.bramble.test.TestUtils.getContact; import static org.briarproject.bramble.test.TestUtils.getRandomBytes; import static org.briarproject.bramble.test.TestUtils.getRandomId; -import static org.briarproject.bramble.test.TestUtils.hasEvent; import static org.briarproject.bramble.util.StringUtils.getRandomString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -138,7 +136,6 @@ public class MailboxPairingTaskImplTest extends BrambleMockTestCase { i.getAndIncrement(); }); task.run(); - hasEvent(txn, OwnMailboxConnectionStatusEvent.class); } @Test diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxSettingsManagerImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxSettingsManagerImplTest.java index 0cdf08cdb..8cf7d03e6 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxSettingsManagerImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxSettingsManagerImplTest.java @@ -163,7 +163,7 @@ public class MailboxSettingsManagerImplTest extends BrambleMockTestCase { }}); manager.recordSuccessfulConnection(txn, now); - hasEvent(txn, OwnMailboxConnectionStatusEvent.class); + assertTrue(hasEvent(txn, OwnMailboxConnectionStatusEvent.class)); } @Test diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxUploadWorkerTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxUploadWorkerTest.java new file mode 100644 index 000000000..048194983 --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/MailboxUploadWorkerTest.java @@ -0,0 +1,465 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.Cancellable; +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.db.Transaction; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.mailbox.MailboxFolderId; +import org.briarproject.bramble.api.mailbox.MailboxProperties; +import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; +import org.briarproject.bramble.api.sync.event.MessageSharedEvent; +import org.briarproject.bramble.api.system.Clock; +import org.briarproject.bramble.api.system.TaskScheduler; +import org.briarproject.bramble.test.BrambleMockTestCase; +import org.briarproject.bramble.test.CaptureArgumentAction; +import org.briarproject.bramble.test.ConsumeArgumentAction; +import org.briarproject.bramble.test.DbExpectations; +import org.briarproject.bramble.test.RunAction; +import org.jmock.Expectations; +import org.jmock.lib.action.DoAllAction; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.singletonList; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS; +import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY; +import static org.briarproject.bramble.mailbox.MailboxUploadWorker.CHECK_DELAY_MS; +import static org.briarproject.bramble.mailbox.MailboxUploadWorker.RETRY_DELAY_MS; +import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory; +import static org.briarproject.bramble.test.TestUtils.getContactId; +import static org.briarproject.bramble.test.TestUtils.getMailboxProperties; +import static org.briarproject.bramble.test.TestUtils.getRandomId; +import static org.briarproject.bramble.test.TestUtils.getTestDirectory; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MailboxUploadWorkerTest extends BrambleMockTestCase { + + private final Executor ioExecutor = context.mock(Executor.class); + private final DatabaseComponent db = context.mock(DatabaseComponent.class); + private final Clock clock = context.mock(Clock.class); + private final TaskScheduler taskScheduler = + context.mock(TaskScheduler.class); + private final EventBus eventBus = context.mock(EventBus.class); + private final ConnectivityChecker connectivityChecker = + context.mock(ConnectivityChecker.class); + private final MailboxApiCaller mailboxApiCaller = + context.mock(MailboxApiCaller.class); + private final MailboxApi mailboxApi = context.mock(MailboxApi.class); + private final MailboxFileManager mailboxFileManager = + context.mock(MailboxFileManager.class); + private final Cancellable apiCall = + context.mock(Cancellable.class, "apiCall"); + private final Cancellable wakeupTask = + context.mock(Cancellable.class, "wakeupTask"); + private final Cancellable checkTask = + context.mock(Cancellable.class, "checkTask"); + + private final MailboxProperties mailboxProperties = + getMailboxProperties(false, CLIENT_SUPPORTS); + private final long now = System.currentTimeMillis(); + private final MailboxFolderId folderId = new MailboxFolderId(getRandomId()); + private final ContactId contactId = getContactId(); + private final MessageId ackedId = new MessageId(getRandomId()); + private final MessageId sentId = new MessageId(getRandomId()); + private final MessageId newMessageId = new MessageId(getRandomId()); + + private File testDir, tempFile; + private MailboxUploadWorker worker; + + @Before + public void setUp() { + testDir = getTestDirectory(); + tempFile = new File(testDir, "temp"); + worker = new MailboxUploadWorker(ioExecutor, db, clock, taskScheduler, + eventBus, connectivityChecker, mailboxApiCaller, mailboxApi, + mailboxFileManager, mailboxProperties, folderId, contactId); + } + + @After + public void tearDown() { + deleteTestDirectory(testDir); + } + + @Test + public void testChecksForDataWhenStartedAndRemovesObserverWhenDestroyed() + throws Exception { + // When the worker is started it should check for data to send + expectRunTaskOnIoExecutor(); + expectCheckForDataToSendNoDataWaiting(); + + worker.start(); + + // When the worker is destroyed it should remove the connectivity + // observer and event listener + expectRemoveObserverAndListener(); + + worker.destroy(); + } + + @Test + public void testChecksConnectivityWhenStartedIfDataIsReady() + throws Exception { + Transaction recordTxn = new Transaction(null, false); + + // When the worker is started it should check for data to send. As + // there's data ready to send immediately, the worker should start a + // connectivity check + expectRunTaskOnIoExecutor(); + expectCheckForDataToSendAndStartConnectivityCheck(); + + worker.start(); + + // Create the temporary file so we can test that it gets deleted + assertTrue(testDir.mkdirs()); + assertTrue(tempFile.createNewFile()); + + // When the connectivity check succeeds, the worker should write a file + // and start an upload task + expectRunTaskOnIoExecutor(); + AtomicReference upload = new AtomicReference<>(); + context.checking(new Expectations() {{ + oneOf(mailboxFileManager).createAndWriteTempFileForUpload( + with(contactId), with(any(OutgoingSessionRecord.class))); + will(new DoAllAction( + // Record some IDs as acked and sent + new ConsumeArgumentAction<>(OutgoingSessionRecord.class, 1, + record -> { + record.onAckSent(singletonList(ackedId)); + record.onMessageSent(sentId); + }), + returnValue(tempFile) + )); + oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); + will(new DoAllAction( + new CaptureArgumentAction<>(upload, ApiCall.class, 0), + returnValue(apiCall) + )); + }}); + + worker.onConnectivityCheckSucceeded(); + + // When the upload task runs, it should upload the file, record + // the acked/sent messages in the DB, and check for more data to send + context.checking(new DbExpectations() {{ + oneOf(mailboxApi).addFile(mailboxProperties, folderId, tempFile); + oneOf(db).transaction(with(false), withDbRunnable(recordTxn)); + oneOf(db).setAckSent(recordTxn, contactId, singletonList(ackedId)); + oneOf(db).setMessagesSent(recordTxn, contactId, + singletonList(sentId), MAX_LATENCY); + }}); + expectCheckForDataToSendNoDataWaiting(); + + assertFalse(upload.get().callApi()); + + // When the worker is destroyed it should remove the connectivity + // observer and event listener + expectRemoveObserverAndListener(); + + worker.destroy(); + + // The file should have been deleted + assertFalse(tempFile.exists()); + } + + @Test + public void testCancelsApiCallWhenDestroyed() throws Exception { + // When the worker is started it should check for data to send. As + // there's data ready to send immediately, the worker should start a + // connectivity check + expectRunTaskOnIoExecutor(); + expectCheckForDataToSendAndStartConnectivityCheck(); + + worker.start(); + + // Create the temporary file so we can test that it gets deleted + assertTrue(testDir.mkdirs()); + assertTrue(tempFile.createNewFile()); + + // When the connectivity check succeeds, the worker should write a file + // and start an upload task + expectRunTaskOnIoExecutor(); + AtomicReference upload = new AtomicReference<>(); + context.checking(new Expectations() {{ + oneOf(mailboxFileManager).createAndWriteTempFileForUpload( + with(contactId), with(any(OutgoingSessionRecord.class))); + will(new DoAllAction( + // Record some IDs as acked and sent + new ConsumeArgumentAction<>(OutgoingSessionRecord.class, 1, + record -> { + record.onAckSent(singletonList(ackedId)); + record.onMessageSent(sentId); + }), + returnValue(tempFile) + )); + oneOf(mailboxApiCaller).retryWithBackoff(with(any(ApiCall.class))); + will(new DoAllAction( + new CaptureArgumentAction<>(upload, ApiCall.class, 0), + returnValue(apiCall) + )); + }}); + + worker.onConnectivityCheckSucceeded(); + + // When the worker is destroyed it should remove the connectivity + // observer and event listener and cancel the upload task + context.checking(new Expectations() {{ + oneOf(apiCall).cancel(); + }}); + expectRemoveObserverAndListener(); + + worker.destroy(); + + // The file should have been deleted + assertFalse(tempFile.exists()); + + // If the upload task runs anyway (cancellation came too late), it + // should return early when it finds the state has changed + assertFalse(upload.get().callApi()); + } + + @Test + public void testSchedulesWakeupWhenStartedIfDataIsNotReady() + throws Exception { + // When the worker is started it should check for data to send. As + // the data isn't ready to send immediately, the worker should + // schedule a wakeup + expectRunTaskOnIoExecutor(); + AtomicReference wakeup = new AtomicReference<>(); + expectCheckForDataToSendAndScheduleWakeup(wakeup); + + worker.start(); + + // When the wakeup task runs it should check for data to send + expectCheckForDataToSendNoDataWaiting(); + + wakeup.get().run(); + + // When the worker is destroyed it should remove the connectivity + // observer and event listener + expectRemoveObserverAndListener(); + + worker.destroy(); + } + + @Test + public void testCancelsWakeupIfDestroyedBeforeWakingUp() throws Exception { + // When the worker is started it should check for data to send. As + // the data isn't ready to send immediately, the worker should + // schedule a wakeup + expectRunTaskOnIoExecutor(); + AtomicReference wakeup = new AtomicReference<>(); + expectCheckForDataToSendAndScheduleWakeup(wakeup); + + worker.start(); + + // When the worker is destroyed it should cancel the wakeup and + // remove the connectivity observer and event listener + context.checking(new Expectations() {{ + oneOf(wakeupTask).cancel(); + }}); + expectRemoveObserverAndListener(); + + worker.destroy(); + + // If the wakeup task runs anyway (cancellation came too late), it + // should return early without doing anything + wakeup.get().run(); + } + + @Test + public void testCancelsWakeupIfEventIsReceivedBeforeWakingUp() + throws Exception { + // When the worker is started it should check for data to send. As + // the data isn't ready to send immediately, the worker should + // schedule a wakeup + expectRunTaskOnIoExecutor(); + AtomicReference wakeup = new AtomicReference<>(); + expectCheckForDataToSendAndScheduleWakeup(wakeup); + + worker.start(); + + // Before the wakeup task runs, the worker receives an event that + // indicates new data may be available. The worker should cancel the + // wakeup task and schedule a check for new data after a short delay + AtomicReference check = new AtomicReference<>(); + expectScheduleCheck(check, CHECK_DELAY_MS); + context.checking(new Expectations() {{ + oneOf(wakeupTask).cancel(); + }}); + + worker.eventOccurred(new MessageSharedEvent(newMessageId)); + + // If the wakeup task runs anyway (cancellation came too late), it + // should return early when it finds the state has changed + wakeup.get().run(); + + // Before the check task runs, the worker receives another event that + // indicates new data may be available. The event should be ignored, + // as a check for new data has already been scheduled + worker.eventOccurred(new MessageSharedEvent(newMessageId)); + + // When the check task runs, it should check for new data + expectCheckForDataToSendNoDataWaiting(); + + check.get().run(); + + // When the worker is destroyed it should remove the connectivity + // observer and event listener + expectRemoveObserverAndListener(); + + worker.destroy(); + } + + @Test + public void testCancelsCheckWhenDestroyed() throws Exception { + // When the worker is started it should check for data to send + expectRunTaskOnIoExecutor(); + expectCheckForDataToSendNoDataWaiting(); + + worker.start(); + + // The worker receives an event that indicates new data may be + // available. The worker should schedule a check for new data after + // a short delay + AtomicReference check = new AtomicReference<>(); + expectScheduleCheck(check, CHECK_DELAY_MS); + + worker.eventOccurred(new MessageSharedEvent(newMessageId)); + + // When the worker is destroyed it should cancel the check and + // remove the connectivity observer and event listener + context.checking(new Expectations() {{ + oneOf(checkTask).cancel(); + }}); + expectRemoveObserverAndListener(); + + worker.destroy(); + + // If the check runs anyway (cancellation came too late), it should + // return early when it finds the state has changed + check.get().run(); + } + + @Test + public void testRetriesAfterDelayIfExceptionOccursWhileWritingFile() + throws Exception { + // When the worker is started it should check for data to send. As + // there's data ready to send immediately, the worker should start a + // connectivity check + expectRunTaskOnIoExecutor(); + expectCheckForDataToSendAndStartConnectivityCheck(); + + worker.start(); + + // When the connectivity check succeeds, the worker should try to + // write a file. This fails with an exception, so the worker should + // retry by scheduling a check for new data after a short delay + expectRunTaskOnIoExecutor(); + AtomicReference check = new AtomicReference<>(); + context.checking(new Expectations() {{ + oneOf(mailboxFileManager).createAndWriteTempFileForUpload( + with(contactId), with(any(OutgoingSessionRecord.class))); + will(throwException(new IOException())); // Oh noes! + }}); + expectScheduleCheck(check, RETRY_DELAY_MS); + + worker.onConnectivityCheckSucceeded(); + + // When the check task runs it should check for new data + expectCheckForDataToSendNoDataWaiting(); + + check.get().run(); + + // When the worker is destroyed it should remove the connectivity + // observer and event listener + expectRemoveObserverAndListener(); + + worker.destroy(); + } + + private void expectRunTaskOnIoExecutor() { + context.checking(new Expectations() {{ + oneOf(ioExecutor).execute(with(any(Runnable.class))); + will(new RunAction()); + }}); + } + + private void expectCheckForDataToSendNoDataWaiting() throws Exception { + Transaction txn = new Transaction(null, true); + + context.checking(new DbExpectations() {{ + oneOf(db).transaction(with(true), withDbRunnable(txn)); + oneOf(db).containsAcksToSend(txn, contactId); + will(returnValue(false)); + oneOf(db).getNextSendTime(txn, contactId, MAX_LATENCY); + will(returnValue(Long.MAX_VALUE)); // No data waiting + }}); + } + + private void expectCheckForDataToSendAndScheduleWakeup( + AtomicReference wakeup) throws Exception { + Transaction txn = new Transaction(null, true); + + context.checking(new DbExpectations() {{ + oneOf(db).transaction(with(true), withDbRunnable(txn)); + oneOf(db).containsAcksToSend(txn, contactId); + will(returnValue(false)); + oneOf(db).getNextSendTime(txn, contactId, MAX_LATENCY); + will(returnValue(now + 1234L)); // Data waiting but not ready + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); + oneOf(taskScheduler).schedule(with(any(Runnable.class)), + with(ioExecutor), with(1234L), with(MILLISECONDS)); + will(new DoAllAction( + new CaptureArgumentAction<>(wakeup, Runnable.class, 0), + returnValue(wakeupTask) + )); + }}); + } + + private void expectCheckForDataToSendAndStartConnectivityCheck() + throws Exception { + Transaction txn = new Transaction(null, true); + + context.checking(new DbExpectations() {{ + oneOf(db).transaction(with(true), withDbRunnable(txn)); + oneOf(db).containsAcksToSend(txn, contactId); + will(returnValue(false)); + oneOf(db).getNextSendTime(txn, contactId, MAX_LATENCY); + will(returnValue(0L)); // Data ready to send + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); + oneOf(connectivityChecker).checkConnectivity(mailboxProperties, + worker); + }}); + } + + private void expectScheduleCheck(AtomicReference check, + long delay) { + context.checking(new Expectations() {{ + oneOf(taskScheduler).schedule(with(any(Runnable.class)), + with(ioExecutor), with(delay), with(MILLISECONDS)); + will(new DoAllAction( + new CaptureArgumentAction<>(check, Runnable.class, 0), + returnValue(checkTask) + )); + }}); + } + + private void expectRemoveObserverAndListener() { + context.checking(new Expectations() {{ + oneOf(connectivityChecker).removeObserver(worker); + oneOf(eventBus).removeListener(worker); + }}); + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/test/ConsumeArgumentAction.java b/bramble-core/src/test/java/org/briarproject/bramble/test/ConsumeArgumentAction.java new file mode 100644 index 000000000..2fa78bbc7 --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/test/ConsumeArgumentAction.java @@ -0,0 +1,32 @@ +package org.briarproject.bramble.test; + +import org.briarproject.bramble.api.Consumer; +import org.hamcrest.Description; +import org.jmock.api.Action; +import org.jmock.api.Invocation; + +public class ConsumeArgumentAction implements Action { + + private final Class capturedClass; + private final int index; + private final Consumer consumer; + + public ConsumeArgumentAction(Class capturedClass, int index, + Consumer consumer) { + this.capturedClass = capturedClass; + this.index = index; + this.consumer = consumer; + } + + @Override + public Object invoke(Invocation invocation) { + consumer.accept(capturedClass.cast(invocation.getParameter(index))); + return null; + } + + @Override + public void describeTo(Description description) { + description.appendText("passes an argument to a consumer"); + } + +}