From e420201b00f31f3ace4fb25a4d2a41b12aa0cb55 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Fri, 7 May 2021 14:48:25 +0100 Subject: [PATCH] Implement RemovableDriveWriterTask, except for progress updates. --- .../plugin/file/RemovableDriveReaderTask.java | 15 +--- .../file/RemovableDriveTaskFactoryImpl.java | 6 +- .../plugin/file/RemovableDriveTaskImpl.java | 10 +++ .../plugin/file/RemovableDriveWriterTask.java | 84 +++++++++++++++---- 4 files changed, 85 insertions(+), 30 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveReaderTask.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveReaderTask.java index 2e0f02e0b..e51743332 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveReaderTask.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveReaderTask.java @@ -14,10 +14,8 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; -import static java.lang.Math.min; import static java.util.logging.Logger.getLogger; import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID; @@ -28,9 +26,6 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl private final static Logger LOG = getLogger(RemovableDriveReaderTask.class.getName()); - private final AtomicLong fileLength = new AtomicLong(0); - private final AtomicLong totalMessageLength = new AtomicLong(0); - RemovableDriveReaderTask( Executor eventExecutor, PluginManager pluginManager, @@ -49,11 +44,11 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl getPlugin().createReader(createProperties()); if (r == null) { LOG.warning("Failed to create reader"); - registry.removeReader(contactId, RemovableDriveReaderTask.this); + registry.removeReader(contactId, this); visitObservers(o -> o.onCompletion(false)); return; } - fileLength.set(file.length()); + progressTotal.set(file.length()); eventBus.addListener(this); connectionManager.manageIncomingConnection(ID, new DecoratedReader(r)); } @@ -69,12 +64,6 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl } } - private void updateProgress(int messageLength) { - long done = totalMessageLength.addAndGet(messageLength); - long total = fileLength.get(); - visitObservers(o -> o.onProgress(min(done, total), total)); - } - private class DecoratedReader implements TransportConnectionReader { private final TransportConnectionReader delegate; diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskFactoryImpl.java index ad1f6ee06..dbf4b3bc7 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskFactoryImpl.java @@ -2,6 +2,7 @@ package org.briarproject.bramble.plugin.file; import org.briarproject.bramble.api.connection.ConnectionManager; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventExecutor; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; @@ -18,6 +19,7 @@ import javax.inject.Inject; @NotNullByDefault class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory { + private final DatabaseComponent db; private final Executor eventExecutor; private final PluginManager pluginManager; private final ConnectionManager connectionManager; @@ -25,10 +27,12 @@ class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory { @Inject RemovableDriveTaskFactoryImpl( + DatabaseComponent db, @EventExecutor Executor eventExecutor, PluginManager pluginManager, ConnectionManager connectionManager, EventBus eventBus) { + this.db = db; this.eventExecutor = eventExecutor; this.pluginManager = pluginManager; this.connectionManager = connectionManager; @@ -45,7 +49,7 @@ class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory { @Override public RemovableDriveTask createWriter(RemovableDriveTaskRegistry registry, ContactId c, File f) { - return new RemovableDriveWriterTask(eventExecutor, pluginManager, + return new RemovableDriveWriterTask(db, eventExecutor, pluginManager, connectionManager, eventBus, registry, c, f); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java index 4984e3876..f7a919ed7 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java @@ -14,9 +14,11 @@ import java.io.File; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.ThreadSafe; +import static java.lang.Math.min; import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH; import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID; @@ -33,6 +35,8 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask { final ContactId contactId; final File file; private final List observers = new CopyOnWriteArrayList<>(); + final AtomicLong progressTotal = new AtomicLong(0); + private final AtomicLong progressDone = new AtomicLong(0); RemovableDriveTaskImpl( Executor eventExecutor, @@ -81,4 +85,10 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask { p.put(PROP_PATH, file.getAbsolutePath()); return p; } + + void updateProgress(long progress) { + long done = progressDone.addAndGet(progress); + long total = progressTotal.get(); + visitObservers(o -> o.onProgress(min(done, total), total)); + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java index 858d6a19c..671921607 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java @@ -2,29 +2,36 @@ package org.briarproject.bramble.plugin.file; import org.briarproject.bramble.api.connection.ConnectionManager; import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.event.Event; import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.event.EventListener; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.PluginManager; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.sync.event.MessagesSentEvent; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.Executor; import java.util.logging.Logger; -import static java.util.logging.Level.WARNING; +import static java.util.logging.Level.INFO; import static java.util.logging.Logger.getLogger; -import static org.briarproject.bramble.util.IoUtils.tryToClose; -import static org.briarproject.bramble.util.LogUtils.logException; +import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID; @NotNullByDefault -class RemovableDriveWriterTask extends RemovableDriveTaskImpl { +class RemovableDriveWriterTask extends RemovableDriveTaskImpl + implements EventListener { private static final Logger LOG = getLogger(RemovableDriveWriterTask.class.getName()); + private final DatabaseComponent db; + RemovableDriveWriterTask( + DatabaseComponent db, Executor eventExecutor, PluginManager pluginManager, ConnectionManager connectionManager, @@ -34,22 +41,67 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl { File file) { super(eventExecutor, pluginManager, connectionManager, eventBus, registry, contactId, file); + this.db = db; } @Override public void run() { - // TODO - OutputStream out = null; - try { - visitObservers(o -> o.onProgress(0, 100)); - out = new FileOutputStream(file); - visitObservers(o -> o.onCompletion(true)); - } catch (IOException e) { - logException(LOG, WARNING, e); - visitObservers(o -> o.onCompletion(false)); - } finally { - tryToClose(out, LOG, WARNING); + TransportConnectionWriter w = + getPlugin().createWriter(createProperties()); + if (w == null) { + LOG.warning("Failed to create writer"); registry.removeWriter(contactId, this); + visitObservers(o -> o.onCompletion(false)); + return; + } + // TODO: Get total bytes to send from DB + eventBus.addListener(this); + connectionManager.manageOutgoingConnection(contactId, ID, + new DecoratedWriter(w)); + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof MessagesSentEvent) { + MessagesSentEvent m = (MessagesSentEvent) e; + if (contactId.equals(m.getContactId())) { + if (LOG.isLoggable(INFO)) { + LOG.info(m.getMessageIds().size() + " messages sent"); + } + // TODO: Update progress + } + } + } + + private class DecoratedWriter implements TransportConnectionWriter { + + private final TransportConnectionWriter delegate; + + private DecoratedWriter(TransportConnectionWriter delegate) { + this.delegate = delegate; + } + + @Override + public int getMaxLatency() { + return delegate.getMaxLatency(); + } + + @Override + public int getMaxIdleTime() { + return delegate.getMaxIdleTime(); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return delegate.getOutputStream(); + } + + @Override + public void dispose(boolean exception) throws IOException { + delegate.dispose(exception); + registry.removeWriter(contactId, RemovableDriveWriterTask.this); + eventBus.removeListener(RemovableDriveWriterTask.this); + visitObservers(o -> o.onCompletion(!exception)); } } }