Add creation of files for upload by MailboxFileManager.

This commit is contained in:
akwizgran
2022-06-15 16:52:55 +01:00
parent b7b253cf24
commit a2fb388aa6
5 changed files with 289 additions and 34 deletions

View File

@@ -1,6 +1,8 @@
package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
import java.io.File;
import java.io.IOException;
@@ -16,6 +18,14 @@ interface MailboxFileManager {
*/
File createTempFileForDownload() throws IOException;
/**
* Creates a file to be uploaded to the given contact and writes any
* waiting data to the file. The IDs of any messages sent or acked will
* be added to the given {@link OutgoingSessionRecord}.
*/
File createAndWriteTempFileForUpload(ContactId contactId,
OutgoingSessionRecord sessionRecord) throws IOException;
/**
* Handles a file that has been downloaded. The file should be created
* with {@link #createTempFileForDownload()}.

View File

@@ -1,6 +1,7 @@
package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener;
@@ -10,13 +11,18 @@ import org.briarproject.bramble.api.mailbox.MailboxDirectory;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
import org.briarproject.bramble.api.plugin.event.TransportActiveEvent;
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -30,6 +36,7 @@ import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleS
import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID;
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH;
import static org.briarproject.bramble.util.IoUtils.delete;
import static org.briarproject.bramble.util.LogUtils.logException;
@ThreadSafe
@@ -41,6 +48,7 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
// Package access for testing
static final String DOWNLOAD_DIR_NAME = "downloads";
static final String UPLOAD_DIR_NAME = "uploads";
private final Executor ioExecutor;
private final PluginManager pluginManager;
@@ -67,14 +75,44 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
@Override
public File createTempFileForDownload() throws IOException {
return createTempFile(DOWNLOAD_DIR_NAME);
}
@Override
public File createAndWriteTempFileForUpload(ContactId contactId,
OutgoingSessionRecord sessionRecord) throws IOException {
// We shouldn't reach this point until the plugin has been started
SimplexPlugin plugin =
(SimplexPlugin) requireNonNull(pluginManager.getPlugin(ID));
File f = createTempFile(UPLOAD_DIR_NAME);
TransportProperties p = new TransportProperties();
p.put(PROP_PATH, f.getAbsolutePath());
TransportConnectionWriter writer = plugin.createWriter(p);
if (writer == null) {
delete(f);
throw new IOException();
}
MailboxFileWriter decorated = new MailboxFileWriter(writer);
LOG.info("Writing file for upload");
connectionManager.manageOutgoingConnection(contactId, ID, decorated,
sessionRecord);
if (decorated.awaitDisposal()) {
// An exception was thrown during the session - delete the file
delete(f);
throw new IOException();
}
return f;
}
private File createTempFile(String dirName) throws IOException {
// Wait for orphaned files to be handled before creating new files
try {
orphanLatch.await();
} catch (InterruptedException e) {
throw new IOException(e);
}
File downloadDir = createDirectoryIfNeeded(DOWNLOAD_DIR_NAME);
return File.createTempFile("mailbox", ".tmp", downloadDir);
File dir = createDirectoryIfNeeded(dirName);
return File.createTempFile("mailbox", ".tmp", dir);
}
private File createDirectoryIfNeeded(String name) throws IOException {
@@ -116,6 +154,8 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
@Override
public void eventOccurred(Event e) {
// Wait for the transport to become active before handling orphaned
// files so that we can get the plugin from the plugin manager
if (e instanceof TransportActiveEvent) {
TransportActiveEvent t = (TransportActiveEvent) e;
if (t.getTransportId().equals(ID)) {
@@ -127,17 +167,29 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
/**
* This method is called at startup, as soon as the plugin is started, to
* handle any files that were left in the download directory at the last
* shutdown.
* delete any files that were left in the upload directory at the last
* shutdown and handle any files that were left in the download directory.
*/
@IoExecutor
private void handleOrphanedFiles() {
try {
File uploadDir = createDirectoryIfNeeded(UPLOAD_DIR_NAME);
File[] orphanedUploads = uploadDir.listFiles();
if (orphanedUploads != null) {
for (File f : orphanedUploads) {
if (!f.delete()) {
LOG.warning("Failed to delete orphaned upload");
}
}
}
File downloadDir = createDirectoryIfNeeded(DOWNLOAD_DIR_NAME);
File[] orphans = downloadDir.listFiles();
// Now that we've got the list of orphans, new files can be created
File[] orphanedDownloads = downloadDir.listFiles();
// Now that we've got the list of orphaned downloads, new files
// can be created in the download directory
orphanLatch.countDown();
if (orphans != null) for (File f : orphans) handleDownloadedFile(f);
if (orphanedDownloads != null) {
for (File f : orphanedDownloads) handleDownloadedFile(f);
}
} catch (IOException e) {
logException(LOG, WARNING, e);
}
@@ -171,4 +223,55 @@ class MailboxFileManagerImpl implements MailboxFileManager, EventListener {
}
}
}
private static class MailboxFileWriter
implements TransportConnectionWriter {
private final TransportConnectionWriter delegate;
private final BlockingQueue<Boolean> disposalResult =
new ArrayBlockingQueue<>(1);
private MailboxFileWriter(TransportConnectionWriter delegate) {
this.delegate = delegate;
}
@Override
public long getMaxLatency() {
return delegate.getMaxLatency();
}
@Override
public int getMaxIdleTime() {
return delegate.getMaxIdleTime();
}
@Override
public boolean isLossyAndCheap() {
return delegate.isLossyAndCheap();
}
@Override
public OutputStream getOutputStream() throws IOException {
return delegate.getOutputStream();
}
@Override
public void dispose(boolean exception) throws IOException {
delegate.dispose(exception);
disposalResult.add(exception);
}
/**
* Waits for the delegate to be disposed and returns true if an
* exception occurred.
*/
private boolean awaitDisposal() {
try {
return disposalResult.take();
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for disposal");
return true;
}
}
}
}

View File

@@ -2,16 +2,20 @@ package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.connection.ConnectionManager.TagController;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
import org.briarproject.bramble.api.plugin.event.TransportActiveEvent;
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.briarproject.bramble.test.CaptureArgumentAction;
import org.briarproject.bramble.test.ConsumeArgumentAction;
import org.briarproject.bramble.test.RunAction;
import org.jmock.Expectations;
import org.jmock.lib.action.DoAllAction;
@@ -20,6 +24,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
@@ -28,11 +33,14 @@ import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleS
import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID;
import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH;
import static org.briarproject.bramble.mailbox.MailboxFileManagerImpl.DOWNLOAD_DIR_NAME;
import static org.briarproject.bramble.mailbox.MailboxFileManagerImpl.UPLOAD_DIR_NAME;
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
import static org.briarproject.bramble.test.TestUtils.getContactId;
import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class MailboxFileManagerImplTest extends BrambleMockTestCase {
@@ -47,6 +55,10 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
private final SimplexPlugin plugin = context.mock(SimplexPlugin.class);
private final TransportConnectionReader transportConnectionReader =
context.mock(TransportConnectionReader.class);
private final TransportConnectionWriter transportConnectionWriter =
context.mock(TransportConnectionWriter.class);
private final ContactId contactId = getContactId();
private File mailboxDir;
private MailboxFileManagerImpl manager;
@@ -65,17 +77,25 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
@Test
public void testHandlesOrphanedFilesAtStartup() throws Exception {
// Create an orphaned file, left behind at the previous shutdown
// Create an orphaned upload, left behind at the previous shutdown
File uploadDir = new File(mailboxDir, UPLOAD_DIR_NAME);
//noinspection ResultOfMethodCallIgnored
uploadDir.mkdirs();
File orphanedUpload = new File(uploadDir, "orphan");
assertTrue(orphanedUpload.createNewFile());
// Create an orphaned download, left behind at the previous shutdown
File downloadDir = new File(mailboxDir, DOWNLOAD_DIR_NAME);
//noinspection ResultOfMethodCallIgnored
downloadDir.mkdirs();
File orphan = new File(downloadDir, "orphan");
assertTrue(orphan.createNewFile());
File orphanedDownload = new File(downloadDir, "orphan");
assertTrue(orphanedDownload.createNewFile());
TransportProperties props = new TransportProperties();
props.put(PROP_PATH, orphan.getAbsolutePath());
props.put(PROP_PATH, orphanedDownload.getAbsolutePath());
// When the plugin becomes active the orphaned file should be handled
// When the plugin becomes active the orphaned upload should be deleted
// and the orphaned download should be handled
context.checking(new Expectations() {{
oneOf(ioExecutor).execute(with(any(Runnable.class)));
will(new RunAction());
@@ -90,10 +110,12 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
}});
manager.eventOccurred(new TransportActiveEvent(ID));
assertFalse(orphanedUpload.exists());
}
@Test
public void testDeletesFileWhenReadSucceeds() throws Exception {
public void testDeletesDownloadedFileWhenReadSucceeds() throws Exception {
expectCheckForOrphans();
manager.eventOccurred(new TransportActiveEvent(ID));
@@ -102,7 +124,7 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
new AtomicReference<>(null);
AtomicReference<TagController> controller = new AtomicReference<>(null);
expectPassFileToConnectionManager(f, reader, controller);
expectPassDownloadedFileToConnectionManager(f, reader, controller);
manager.handleDownloadedFile(f);
// The read is successful, so the tag controller should allow the tag
@@ -117,29 +139,117 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
}
@Test
public void testDeletesFileWhenTagIsNotRecognised() throws Exception {
testDeletesFile(false, RUNNING, false);
}
@Test
public void testDeletesFileWhenReadFails() throws Exception {
testDeletesFile(true, RUNNING, false);
}
@Test
public void testDoesNotDeleteFileWhenTagIsNotRecognisedAtShutdown()
public void testDeletesDownloadedFileWhenTagIsNotRecognised()
throws Exception {
testDeletesFile(false, STOPPING, true);
testDeletesDownloadedFile(false, RUNNING, false);
}
@Test
public void testDoesNotDeleteFileWhenReadFailsAtShutdown()
throws Exception {
testDeletesFile(true, STOPPING, true);
public void testDeletesDownloadedFileWhenReadFails() throws Exception {
testDeletesDownloadedFile(true, RUNNING, false);
}
private void testDeletesFile(boolean recognised, LifecycleState state,
boolean fileExists) throws Exception {
@Test
public void testDoesNotDeleteDownloadedFileWhenTagIsNotRecognisedAtShutdown()
throws Exception {
testDeletesDownloadedFile(false, STOPPING, true);
}
@Test
public void testDoesNotDeleteDownloadedFileWhenReadFailsAtShutdown()
throws Exception {
testDeletesDownloadedFile(true, STOPPING, true);
}
@Test(expected = IOException.class)
public void testThrowsExceptionIfPluginFailsToCreateWriter()
throws Exception {
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
expectCheckForOrphans();
manager.eventOccurred(new TransportActiveEvent(ID));
context.checking(new Expectations() {{
oneOf(pluginManager).getPlugin(ID);
will(returnValue(plugin));
oneOf(plugin).createWriter(with(any(TransportProperties.class)));
will(returnValue(null));
}});
manager.createAndWriteTempFileForUpload(contactId, sessionRecord);
}
@Test(expected = IOException.class)
public void testThrowsExceptionIfSessionFailsWithException()
throws Exception {
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
expectCheckForOrphans();
manager.eventOccurred(new TransportActiveEvent(ID));
context.checking(new Expectations() {{
oneOf(pluginManager).getPlugin(ID);
will(returnValue(plugin));
oneOf(plugin).createWriter(with(any(TransportProperties.class)));
will(returnValue(transportConnectionWriter));
oneOf(transportConnectionWriter).dispose(true);
oneOf(connectionManager).manageOutgoingConnection(with(contactId),
with(ID), with(any(TransportConnectionWriter.class)),
with(sessionRecord));
// The session fails with an exception. We need to use an action
// for this, as createAndWriteTempFileForUpload() waits for it to
// happen before returning
will(new ConsumeArgumentAction<>(TransportConnectionWriter.class, 2,
writer -> {
try {
writer.dispose(true);
} catch (IOException e) {
fail();
}
}
));
}});
manager.createAndWriteTempFileForUpload(contactId, sessionRecord);
}
@Test
public void testReturnsFileIfSessionSucceeds() throws Exception {
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
expectCheckForOrphans();
manager.eventOccurred(new TransportActiveEvent(ID));
context.checking(new Expectations() {{
oneOf(pluginManager).getPlugin(ID);
will(returnValue(plugin));
oneOf(plugin).createWriter(with(any(TransportProperties.class)));
will(returnValue(transportConnectionWriter));
oneOf(transportConnectionWriter).dispose(false);
oneOf(connectionManager).manageOutgoingConnection(with(contactId),
with(ID), with(any(TransportConnectionWriter.class)),
with(sessionRecord));
// The session succeeds. We need to use an action for this, as
// createAndWriteTempFileForUpload() waits for it to happen before
// returning
will(new ConsumeArgumentAction<>(TransportConnectionWriter.class, 2,
writer -> {
try {
writer.dispose(false);
} catch (IOException e) {
fail();
}
}
));
}});
File f = manager.createAndWriteTempFileForUpload(contactId,
sessionRecord);
assertTrue(f.exists());
}
private void testDeletesDownloadedFile(boolean recognised,
LifecycleState state, boolean fileExists) throws Exception {
expectCheckForOrphans();
manager.eventOccurred(new TransportActiveEvent(ID));
@@ -148,7 +258,7 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
new AtomicReference<>(null);
AtomicReference<TagController> controller = new AtomicReference<>(null);
expectPassFileToConnectionManager(f, reader, controller);
expectPassDownloadedFileToConnectionManager(f, reader, controller);
manager.handleDownloadedFile(f);
context.checking(new Expectations() {{
@@ -169,7 +279,7 @@ public class MailboxFileManagerImplTest extends BrambleMockTestCase {
}});
}
private void expectPassFileToConnectionManager(File f,
private void expectPassDownloadedFileToConnectionManager(File f,
AtomicReference<TransportConnectionReader> reader,
AtomicReference<TagController> controller) {
TransportProperties props = new TransportProperties();

View File

@@ -0,0 +1,32 @@
package org.briarproject.bramble.test;
import org.briarproject.bramble.api.Consumer;
import org.hamcrest.Description;
import org.jmock.api.Action;
import org.jmock.api.Invocation;
public class ConsumeArgumentAction<T> implements Action {
private final Class<T> capturedClass;
private final int index;
private final Consumer<T> consumer;
public ConsumeArgumentAction(Class<T> capturedClass, int index,
Consumer<T> consumer) {
this.capturedClass = capturedClass;
this.index = index;
this.consumer = consumer;
}
@Override
public Object invoke(Invocation invocation) {
consumer.accept(capturedClass.cast(invocation.getParameter(index)));
return null;
}
@Override
public void describeTo(Description description) {
description.appendText("passes an argument to a consumer");
}
}