Implement RemovableDriveWriterTask, except for progress updates.

This commit is contained in:
akwizgran
2021-05-07 14:48:25 +01:00
parent 03248d04e5
commit e420201b00
4 changed files with 85 additions and 30 deletions

View File

@@ -14,10 +14,8 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import static java.lang.Math.min;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID;
@@ -28,9 +26,6 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
private final static Logger LOG =
getLogger(RemovableDriveReaderTask.class.getName());
private final AtomicLong fileLength = new AtomicLong(0);
private final AtomicLong totalMessageLength = new AtomicLong(0);
RemovableDriveReaderTask(
Executor eventExecutor,
PluginManager pluginManager,
@@ -49,11 +44,11 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
getPlugin().createReader(createProperties());
if (r == null) {
LOG.warning("Failed to create reader");
registry.removeReader(contactId, RemovableDriveReaderTask.this);
registry.removeReader(contactId, this);
visitObservers(o -> o.onCompletion(false));
return;
}
fileLength.set(file.length());
progressTotal.set(file.length());
eventBus.addListener(this);
connectionManager.manageIncomingConnection(ID, new DecoratedReader(r));
}
@@ -69,12 +64,6 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
}
}
private void updateProgress(int messageLength) {
long done = totalMessageLength.addAndGet(messageLength);
long total = fileLength.get();
visitObservers(o -> o.onProgress(min(done, total), total));
}
private class DecoratedReader implements TransportConnectionReader {
private final TransportConnectionReader delegate;

View File

@@ -2,6 +2,7 @@ package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
@@ -18,6 +19,7 @@ import javax.inject.Inject;
@NotNullByDefault
class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory {
private final DatabaseComponent db;
private final Executor eventExecutor;
private final PluginManager pluginManager;
private final ConnectionManager connectionManager;
@@ -25,10 +27,12 @@ class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory {
@Inject
RemovableDriveTaskFactoryImpl(
DatabaseComponent db,
@EventExecutor Executor eventExecutor,
PluginManager pluginManager,
ConnectionManager connectionManager,
EventBus eventBus) {
this.db = db;
this.eventExecutor = eventExecutor;
this.pluginManager = pluginManager;
this.connectionManager = connectionManager;
@@ -45,7 +49,7 @@ class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory {
@Override
public RemovableDriveTask createWriter(RemovableDriveTaskRegistry registry,
ContactId c, File f) {
return new RemovableDriveWriterTask(eventExecutor, pluginManager,
return new RemovableDriveWriterTask(db, eventExecutor, pluginManager,
connectionManager, eventBus, registry, c, f);
}
}

View File

@@ -14,9 +14,11 @@ import java.io.File;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
import static java.lang.Math.min;
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH;
import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID;
@@ -33,6 +35,8 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
final ContactId contactId;
final File file;
private final List<Observer> observers = new CopyOnWriteArrayList<>();
final AtomicLong progressTotal = new AtomicLong(0);
private final AtomicLong progressDone = new AtomicLong(0);
RemovableDriveTaskImpl(
Executor eventExecutor,
@@ -81,4 +85,10 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
p.put(PROP_PATH, file.getAbsolutePath());
return p;
}
void updateProgress(long progress) {
long done = progressDone.addAndGet(progress);
long total = progressTotal.get();
visitObservers(o -> o.onProgress(min(done, total), total));
}
}

View File

@@ -2,29 +2,36 @@ package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
import org.briarproject.bramble.api.sync.event.MessagesSentEvent;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Level.INFO;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.util.IoUtils.tryToClose;
import static org.briarproject.bramble.util.LogUtils.logException;
import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID;
@NotNullByDefault
class RemovableDriveWriterTask extends RemovableDriveTaskImpl {
class RemovableDriveWriterTask extends RemovableDriveTaskImpl
implements EventListener {
private static final Logger LOG =
getLogger(RemovableDriveWriterTask.class.getName());
private final DatabaseComponent db;
RemovableDriveWriterTask(
DatabaseComponent db,
Executor eventExecutor,
PluginManager pluginManager,
ConnectionManager connectionManager,
@@ -34,22 +41,67 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl {
File file) {
super(eventExecutor, pluginManager, connectionManager, eventBus,
registry, contactId, file);
this.db = db;
}
@Override
public void run() {
// TODO
OutputStream out = null;
try {
visitObservers(o -> o.onProgress(0, 100));
out = new FileOutputStream(file);
visitObservers(o -> o.onCompletion(true));
} catch (IOException e) {
logException(LOG, WARNING, e);
visitObservers(o -> o.onCompletion(false));
} finally {
tryToClose(out, LOG, WARNING);
TransportConnectionWriter w =
getPlugin().createWriter(createProperties());
if (w == null) {
LOG.warning("Failed to create writer");
registry.removeWriter(contactId, this);
visitObservers(o -> o.onCompletion(false));
return;
}
// TODO: Get total bytes to send from DB
eventBus.addListener(this);
connectionManager.manageOutgoingConnection(contactId, ID,
new DecoratedWriter(w));
}
@Override
public void eventOccurred(Event e) {
if (e instanceof MessagesSentEvent) {
MessagesSentEvent m = (MessagesSentEvent) e;
if (contactId.equals(m.getContactId())) {
if (LOG.isLoggable(INFO)) {
LOG.info(m.getMessageIds().size() + " messages sent");
}
// TODO: Update progress
}
}
}
private class DecoratedWriter implements TransportConnectionWriter {
private final TransportConnectionWriter delegate;
private DecoratedWriter(TransportConnectionWriter delegate) {
this.delegate = delegate;
}
@Override
public int getMaxLatency() {
return delegate.getMaxLatency();
}
@Override
public int getMaxIdleTime() {
return delegate.getMaxIdleTime();
}
@Override
public OutputStream getOutputStream() throws IOException {
return delegate.getOutputStream();
}
@Override
public void dispose(boolean exception) throws IOException {
delegate.dispose(exception);
registry.removeWriter(contactId, RemovableDriveWriterTask.this);
eventBus.removeListener(RemovableDriveWriterTask.this);
visitObservers(o -> o.onCompletion(!exception));
}
}
}