From 2c39b02644f59314b23323102cf6631efc9fbf48 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Fri, 7 May 2021 14:20:30 +0100 Subject: [PATCH] Implement RemovableDriverReaderTask. --- .../api/plugin/file/RemovableDriveTask.java | 2 +- .../file/RemovableDriveTaskFactoryImpl.java | 21 ++++- .../plugin/file/RemovableDriveTaskImpl.java | 38 +++++++- .../file/RemovableDriverReaderTask.java | 92 +++++++++++++++---- .../file/RemovableDriverWriterTask.java | 15 ++- 5 files changed, 138 insertions(+), 30 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/file/RemovableDriveTask.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/file/RemovableDriveTask.java index 3bc4c1837..c4f2e00a9 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/file/RemovableDriveTask.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/file/RemovableDriveTask.java @@ -26,7 +26,7 @@ public interface RemovableDriveTask extends Runnable { interface Observer { @EventExecutor - void onProgress(long written, long total); + void onProgress(long done, long total); @EventExecutor void onCompletion(boolean success); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskFactoryImpl.java index b46f659d4..d8021ff82 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskFactoryImpl.java @@ -1,8 +1,11 @@ package org.briarproject.bramble.plugin.file; +import org.briarproject.bramble.api.connection.ConnectionManager; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventExecutor; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.file.RemovableDriveTask; import java.io.File; @@ -16,21 +19,33 @@ import javax.inject.Inject; class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory { private final Executor eventExecutor; + private final PluginManager pluginManager; + private final ConnectionManager connectionManager; + private final EventBus eventBus; @Inject - RemovableDriveTaskFactoryImpl(@EventExecutor Executor eventExecutor) { + RemovableDriveTaskFactoryImpl( + @EventExecutor Executor eventExecutor, + PluginManager pluginManager, + ConnectionManager connectionManager, + EventBus eventBus) { this.eventExecutor = eventExecutor; + this.pluginManager = pluginManager; + this.connectionManager = connectionManager; + this.eventBus = eventBus; } @Override public RemovableDriveTask createReader(RemovableDriveTaskRegistry registry, ContactId c, File f) { - return new RemovableDriverReaderTask(eventExecutor, registry, c, f); + return new RemovableDriverReaderTask(eventExecutor, pluginManager, + connectionManager, eventBus, registry, c, f); } @Override public RemovableDriveTask createWriter(RemovableDriveTaskRegistry registry, ContactId c, File f) { - return new RemovableDriverWriterTask(eventExecutor, registry, c, f); + return new RemovableDriverWriterTask(eventExecutor, pluginManager, + connectionManager, eventBus, registry, c, f); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java index e5f09f971..4984e3876 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java @@ -1,9 +1,14 @@ package org.briarproject.bramble.plugin.file; import org.briarproject.bramble.api.Consumer; +import org.briarproject.bramble.api.connection.ConnectionManager; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.file.RemovableDriveTask; +import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; +import org.briarproject.bramble.api.properties.TransportProperties; import java.io.File; import java.util.List; @@ -12,23 +17,38 @@ import java.util.concurrent.Executor; import javax.annotation.concurrent.ThreadSafe; +import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; +import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH; +import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID; + @ThreadSafe @NotNullByDefault abstract class RemovableDriveTaskImpl implements RemovableDriveTask { private final Executor eventExecutor; + private final PluginManager pluginManager; + final ConnectionManager connectionManager; + final EventBus eventBus; final RemovableDriveTaskRegistry registry; final ContactId contactId; final File file; private final List observers = new CopyOnWriteArrayList<>(); - RemovableDriveTaskImpl(Executor eventExecutor, - RemovableDriveTaskRegistry registry, ContactId contactId, + RemovableDriveTaskImpl( + Executor eventExecutor, + PluginManager pluginManager, + ConnectionManager connectionManager, + EventBus eventBus, + RemovableDriveTaskRegistry registry, + ContactId contactId, File file) { + this.eventExecutor = eventExecutor; + this.pluginManager = pluginManager; + this.connectionManager = connectionManager; + this.eventBus = eventBus; + this.registry = registry; this.contactId = contactId; this.file = file; - this.registry = registry; - this.eventExecutor = eventExecutor; } @Override @@ -51,4 +71,14 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask { for (Observer o : observers) visitor.accept(o); }); } + + SimplexPlugin getPlugin() { + return (SimplexPlugin) requireNonNull(pluginManager.getPlugin(ID)); + } + + TransportProperties createProperties() { + TransportProperties p = new TransportProperties(); + p.put(PROP_PATH, file.getAbsolutePath()); + return p; + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriverReaderTask.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriverReaderTask.java index 3cc53c7b4..a7374fc53 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriverReaderTask.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriverReaderTask.java @@ -1,46 +1,100 @@ package org.briarproject.bramble.plugin.file; +import org.briarproject.bramble.api.connection.ConnectionManager; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.event.EventListener; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.PluginManager; +import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.sync.event.MessageAddedEvent; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; -import static java.util.logging.Level.WARNING; +import static java.lang.Math.min; import static java.util.logging.Logger.getLogger; -import static org.briarproject.bramble.util.IoUtils.tryToClose; -import static org.briarproject.bramble.util.LogUtils.logException; +import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID; @NotNullByDefault -class RemovableDriverReaderTask extends RemovableDriveTaskImpl { +class RemovableDriverReaderTask extends RemovableDriveTaskImpl + implements EventListener { private final static Logger LOG = getLogger(RemovableDriverReaderTask.class.getName()); - RemovableDriverReaderTask(Executor eventExecutor, - RemovableDriveTaskRegistry registry, ContactId contactId, + private final AtomicLong fileLength = new AtomicLong(0); + private final AtomicLong totalMessageLength = new AtomicLong(0); + + RemovableDriverReaderTask( + Executor eventExecutor, + PluginManager pluginManager, + ConnectionManager connectionManager, + EventBus eventBus, + RemovableDriveTaskRegistry registry, + ContactId contactId, File file) { - super(eventExecutor, registry, contactId, file); + super(eventExecutor, pluginManager, connectionManager, eventBus, + registry, contactId, file); } @Override public void run() { - // TODO - InputStream in = null; - try { - visitObservers(o -> o.onProgress(0, 100)); - in = new FileInputStream(file); - visitObservers(o -> o.onCompletion(true)); - } catch (IOException e) { - logException(LOG, WARNING, e); + TransportConnectionReader r = + getPlugin().createReader(createProperties()); + if (r == null) { + LOG.warning("Failed to create reader"); + registry.removeReader(contactId, RemovableDriverReaderTask.this); visitObservers(o -> o.onCompletion(false)); - } finally { - tryToClose(in, LOG, WARNING); - registry.removeReader(contactId, this); + return; + } + fileLength.set(file.length()); + eventBus.addListener(this); + connectionManager.manageIncomingConnection(ID, new DecoratedReader(r)); + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof MessageAddedEvent) { + MessageAddedEvent m = (MessageAddedEvent) e; + if (contactId.equals(m.getContactId())) { + LOG.info("Message received"); + updateProgress(m.getMessage().getRawLength()); + } + } + } + + private void updateProgress(int messageLength) { + long done = totalMessageLength.addAndGet(messageLength); + long total = fileLength.get(); + visitObservers(o -> o.onProgress(min(done, total), total)); + } + + private class DecoratedReader implements TransportConnectionReader { + + private final TransportConnectionReader delegate; + + private DecoratedReader(TransportConnectionReader delegate) { + this.delegate = delegate; + } + + @Override + public InputStream getInputStream() throws IOException { + return delegate.getInputStream(); + } + + @Override + public void dispose(boolean exception, boolean recognised) + throws IOException { + delegate.dispose(exception, recognised); + registry.removeReader(contactId, RemovableDriverReaderTask.this); + eventBus.removeListener(RemovableDriverReaderTask.this); + visitObservers(o -> o.onCompletion(!exception && recognised)); } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriverWriterTask.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriverWriterTask.java index ed0715d16..7394b1ac2 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriverWriterTask.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriverWriterTask.java @@ -1,7 +1,10 @@ package org.briarproject.bramble.plugin.file; +import org.briarproject.bramble.api.connection.ConnectionManager; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.PluginManager; import java.io.File; import java.io.FileOutputStream; @@ -21,10 +24,16 @@ class RemovableDriverWriterTask extends RemovableDriveTaskImpl { private static final Logger LOG = getLogger(RemovableDriverWriterTask.class.getName()); - RemovableDriverWriterTask(Executor eventExecutor, - RemovableDriveTaskRegistry registry, ContactId contactId, + RemovableDriverWriterTask( + Executor eventExecutor, + PluginManager pluginManager, + ConnectionManager connectionManager, + EventBus eventBus, + RemovableDriveTaskRegistry registry, + ContactId contactId, File file) { - super(eventExecutor, registry, contactId, file); + super(eventExecutor, pluginManager, connectionManager, eventBus, + registry, contactId, file); } @Override