Implement RemovableDriverReaderTask.

This commit is contained in:
akwizgran
2021-05-07 14:20:30 +01:00
parent c9c6f3682c
commit 2c39b02644
5 changed files with 138 additions and 30 deletions

View File

@@ -26,7 +26,7 @@ public interface RemovableDriveTask extends Runnable {
interface Observer {
@EventExecutor
void onProgress(long written, long total);
void onProgress(long done, long total);
@EventExecutor
void onCompletion(boolean success);

View File

@@ -1,8 +1,11 @@
package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask;
import java.io.File;
@@ -16,21 +19,33 @@ import javax.inject.Inject;
class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory {
private final Executor eventExecutor;
private final PluginManager pluginManager;
private final ConnectionManager connectionManager;
private final EventBus eventBus;
@Inject
RemovableDriveTaskFactoryImpl(@EventExecutor Executor eventExecutor) {
RemovableDriveTaskFactoryImpl(
@EventExecutor Executor eventExecutor,
PluginManager pluginManager,
ConnectionManager connectionManager,
EventBus eventBus) {
this.eventExecutor = eventExecutor;
this.pluginManager = pluginManager;
this.connectionManager = connectionManager;
this.eventBus = eventBus;
}
@Override
public RemovableDriveTask createReader(RemovableDriveTaskRegistry registry,
ContactId c, File f) {
return new RemovableDriverReaderTask(eventExecutor, registry, c, f);
return new RemovableDriverReaderTask(eventExecutor, pluginManager,
connectionManager, eventBus, registry, c, f);
}
@Override
public RemovableDriveTask createWriter(RemovableDriveTaskRegistry registry,
ContactId c, File f) {
return new RemovableDriverWriterTask(eventExecutor, registry, c, f);
return new RemovableDriverWriterTask(eventExecutor, pluginManager,
connectionManager, eventBus, registry, c, f);
}
}

View File

@@ -1,9 +1,14 @@
package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.Consumer;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask;
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
import org.briarproject.bramble.api.properties.TransportProperties;
import java.io.File;
import java.util.List;
@@ -12,23 +17,38 @@ import java.util.concurrent.Executor;
import javax.annotation.concurrent.ThreadSafe;
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
import static org.briarproject.bramble.api.plugin.file.FileConstants.PROP_PATH;
import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID;
@ThreadSafe
@NotNullByDefault
abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
private final Executor eventExecutor;
private final PluginManager pluginManager;
final ConnectionManager connectionManager;
final EventBus eventBus;
final RemovableDriveTaskRegistry registry;
final ContactId contactId;
final File file;
private final List<Observer> observers = new CopyOnWriteArrayList<>();
RemovableDriveTaskImpl(Executor eventExecutor,
RemovableDriveTaskRegistry registry, ContactId contactId,
RemovableDriveTaskImpl(
Executor eventExecutor,
PluginManager pluginManager,
ConnectionManager connectionManager,
EventBus eventBus,
RemovableDriveTaskRegistry registry,
ContactId contactId,
File file) {
this.eventExecutor = eventExecutor;
this.pluginManager = pluginManager;
this.connectionManager = connectionManager;
this.eventBus = eventBus;
this.registry = registry;
this.contactId = contactId;
this.file = file;
this.registry = registry;
this.eventExecutor = eventExecutor;
}
@Override
@@ -51,4 +71,14 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
for (Observer o : observers) visitor.accept(o);
});
}
SimplexPlugin getPlugin() {
return (SimplexPlugin) requireNonNull(pluginManager.getPlugin(ID));
}
TransportProperties createProperties() {
TransportProperties p = new TransportProperties();
p.put(PROP_PATH, file.getAbsolutePath());
return p;
}
}

View File

@@ -1,46 +1,100 @@
package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static java.lang.Math.min;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.util.IoUtils.tryToClose;
import static org.briarproject.bramble.util.LogUtils.logException;
import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID;
@NotNullByDefault
class RemovableDriverReaderTask extends RemovableDriveTaskImpl {
class RemovableDriverReaderTask extends RemovableDriveTaskImpl
implements EventListener {
private final static Logger LOG =
getLogger(RemovableDriverReaderTask.class.getName());
RemovableDriverReaderTask(Executor eventExecutor,
RemovableDriveTaskRegistry registry, ContactId contactId,
private final AtomicLong fileLength = new AtomicLong(0);
private final AtomicLong totalMessageLength = new AtomicLong(0);
RemovableDriverReaderTask(
Executor eventExecutor,
PluginManager pluginManager,
ConnectionManager connectionManager,
EventBus eventBus,
RemovableDriveTaskRegistry registry,
ContactId contactId,
File file) {
super(eventExecutor, registry, contactId, file);
super(eventExecutor, pluginManager, connectionManager, eventBus,
registry, contactId, file);
}
@Override
public void run() {
// TODO
InputStream in = null;
try {
visitObservers(o -> o.onProgress(0, 100));
in = new FileInputStream(file);
visitObservers(o -> o.onCompletion(true));
} catch (IOException e) {
logException(LOG, WARNING, e);
TransportConnectionReader r =
getPlugin().createReader(createProperties());
if (r == null) {
LOG.warning("Failed to create reader");
registry.removeReader(contactId, RemovableDriverReaderTask.this);
visitObservers(o -> o.onCompletion(false));
} finally {
tryToClose(in, LOG, WARNING);
registry.removeReader(contactId, this);
return;
}
fileLength.set(file.length());
eventBus.addListener(this);
connectionManager.manageIncomingConnection(ID, new DecoratedReader(r));
}
@Override
public void eventOccurred(Event e) {
if (e instanceof MessageAddedEvent) {
MessageAddedEvent m = (MessageAddedEvent) e;
if (contactId.equals(m.getContactId())) {
LOG.info("Message received");
updateProgress(m.getMessage().getRawLength());
}
}
}
private void updateProgress(int messageLength) {
long done = totalMessageLength.addAndGet(messageLength);
long total = fileLength.get();
visitObservers(o -> o.onProgress(min(done, total), total));
}
private class DecoratedReader implements TransportConnectionReader {
private final TransportConnectionReader delegate;
private DecoratedReader(TransportConnectionReader delegate) {
this.delegate = delegate;
}
@Override
public InputStream getInputStream() throws IOException {
return delegate.getInputStream();
}
@Override
public void dispose(boolean exception, boolean recognised)
throws IOException {
delegate.dispose(exception, recognised);
registry.removeReader(contactId, RemovableDriverReaderTask.this);
eventBus.removeListener(RemovableDriverReaderTask.this);
visitObservers(o -> o.onCompletion(!exception && recognised));
}
}
}

View File

@@ -1,7 +1,10 @@
package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager;
import java.io.File;
import java.io.FileOutputStream;
@@ -21,10 +24,16 @@ class RemovableDriverWriterTask extends RemovableDriveTaskImpl {
private static final Logger LOG =
getLogger(RemovableDriverWriterTask.class.getName());
RemovableDriverWriterTask(Executor eventExecutor,
RemovableDriveTaskRegistry registry, ContactId contactId,
RemovableDriverWriterTask(
Executor eventExecutor,
PluginManager pluginManager,
ConnectionManager connectionManager,
EventBus eventBus,
RemovableDriveTaskRegistry registry,
ContactId contactId,
File file) {
super(eventExecutor, registry, contactId, file);
super(eventExecutor, pluginManager, connectionManager, eventBus,
registry, contactId, file);
}
@Override