Merge branch '2071-removable-drive-task-refactoring' into '1802-sync-via-removable-storage'

Refactor removable drive task management

See merge request briar/briar!1480
This commit is contained in:
Torsten Grote
2021-06-15 12:23:27 +00:00
11 changed files with 80 additions and 96 deletions

View File

@@ -10,32 +10,31 @@ import javax.annotation.Nullable;
public interface RemovableDriveManager { public interface RemovableDriveManager {
/** /**
* Returns the currently running reader task for the given contact, * Returns the currently running reader task, or null if no reader task
* or null if no task is running. * is running.
*/ */
@Nullable @Nullable
RemovableDriveTask getCurrentReaderTask(ContactId c); RemovableDriveTask getCurrentReaderTask();
/** /**
* Returns the currently running writer task for the given contact, * Returns the currently running writer task, or null if no writer task
* or null if no task is running. * is running.
*/ */
@Nullable @Nullable
RemovableDriveTask getCurrentWriterTask(ContactId c); RemovableDriveTask getCurrentWriterTask();
/** /**
* Starts and returns a reader task for the given contact, reading from * Starts and returns a reader task, reading from a stream described by
* a stream described by the given transport properties. If a reader task * the given transport properties. If a reader task is already running,
* for the contact is already running, it will be returned and the * it will be returned and the argument will be ignored.
* transport properties will be ignored.
*/ */
RemovableDriveTask startReaderTask(ContactId c, TransportProperties p); RemovableDriveTask startReaderTask(TransportProperties p);
/** /**
* Starts and returns a writer task for the given contact, writing to * Starts and returns a writer task for the given contact, writing to
* a stream described by the given transport properties. If a writer task * a stream described by the given transport properties. If a writer task
* for the contact is already running, it will be returned and the * is already running, it will be returned and the arguments will be
* transport properties will be ignored. * ignored.
*/ */
RemovableDriveTask startWriterTask(ContactId c, TransportProperties p); RemovableDriveTask startWriterTask(ContactId c, TransportProperties p);
} }

View File

@@ -39,7 +39,7 @@ public interface RemovableDriveTask extends Runnable {
/** /**
* Returns the total length in bytes of the messages read or written * Returns the total length in bytes of the messages read or written
* so far. * so far, or zero if the total is unknown.
*/ */
public long getDone() { public long getDone() {
return done; return done;

View File

@@ -7,10 +7,10 @@ import org.briarproject.bramble.api.plugin.file.RemovableDriveManager;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask; import org.briarproject.bramble.api.plugin.file.RemovableDriveTask;
import org.briarproject.bramble.api.properties.TransportProperties; import org.briarproject.bramble.api.properties.TransportProperties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject; import javax.inject.Inject;
@@ -21,10 +21,12 @@ class RemovableDriveManagerImpl
private final Executor ioExecutor; private final Executor ioExecutor;
private final RemovableDriveTaskFactory taskFactory; private final RemovableDriveTaskFactory taskFactory;
private final ConcurrentHashMap<ContactId, RemovableDriveTask> private final Object lock = new Object();
readers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<ContactId, RemovableDriveTask> @GuardedBy("lock")
writers = new ConcurrentHashMap<>(); private RemovableDriveTask reader = null;
@GuardedBy("lock")
private RemovableDriveTask writer = null;
@Inject @Inject
RemovableDriveManagerImpl(@IoExecutor Executor ioExecutor, RemovableDriveManagerImpl(@IoExecutor Executor ioExecutor,
@@ -35,49 +37,54 @@ class RemovableDriveManagerImpl
@Nullable @Nullable
@Override @Override
public RemovableDriveTask getCurrentReaderTask(ContactId c) { public RemovableDriveTask getCurrentReaderTask() {
return readers.get(c); synchronized (lock) {
return reader;
}
} }
@Nullable @Nullable
@Override @Override
public RemovableDriveTask getCurrentWriterTask(ContactId c) { public RemovableDriveTask getCurrentWriterTask() {
return writers.get(c); synchronized (lock) {
return writer;
}
} }
@Override @Override
public RemovableDriveTask startReaderTask(ContactId c, public RemovableDriveTask startReaderTask(TransportProperties p) {
TransportProperties p) { RemovableDriveTask created;
RemovableDriveTask task = taskFactory.createReader(this, c, p); synchronized (lock) {
RemovableDriveTask old = readers.putIfAbsent(c, task); if (reader != null) return reader;
if (old == null) { reader = created = taskFactory.createReader(this, p);
ioExecutor.execute(task);
return task;
} else {
return old;
} }
ioExecutor.execute(created);
return created;
} }
@Override @Override
public RemovableDriveTask startWriterTask(ContactId c, public RemovableDriveTask startWriterTask(ContactId c,
TransportProperties p) { TransportProperties p) {
RemovableDriveTask task = taskFactory.createWriter(this, c, p); RemovableDriveTask created;
RemovableDriveTask old = writers.putIfAbsent(c, task); synchronized (lock) {
if (old == null) { if (writer != null) return writer;
ioExecutor.execute(task); writer = created = taskFactory.createWriter(this, c, p);
return task; }
} else { ioExecutor.execute(created);
return old; return created;
}
@Override
public void removeReader(RemovableDriveTask task) {
synchronized (lock) {
if (reader == task) reader = null;
} }
} }
@Override @Override
public void removeReader(ContactId c, RemovableDriveTask task) { public void removeWriter(RemovableDriveTask task) {
readers.remove(c, task); synchronized (lock) {
} if (writer == task) writer = null;
}
@Override
public void removeWriter(ContactId c, RemovableDriveTask task) {
writers.remove(c, task);
} }
} }

View File

@@ -1,15 +1,11 @@
package org.briarproject.bramble.plugin.file; package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.connection.ConnectionManager; import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportConnectionReader; import org.briarproject.bramble.api.plugin.TransportConnectionReader;
import org.briarproject.bramble.api.properties.TransportProperties; import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@@ -20,8 +16,7 @@ import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID; import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID;
@NotNullByDefault @NotNullByDefault
class RemovableDriveReaderTask extends RemovableDriveTaskImpl class RemovableDriveReaderTask extends RemovableDriveTaskImpl {
implements EventListener {
private final static Logger LOG = private final static Logger LOG =
getLogger(RemovableDriveReaderTask.class.getName()); getLogger(RemovableDriveReaderTask.class.getName());
@@ -32,10 +27,9 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
ConnectionManager connectionManager, ConnectionManager connectionManager,
EventBus eventBus, EventBus eventBus,
RemovableDriveTaskRegistry registry, RemovableDriveTaskRegistry registry,
ContactId contactId,
TransportProperties transportProperties) { TransportProperties transportProperties) {
super(eventExecutor, pluginManager, connectionManager, eventBus, super(eventExecutor, pluginManager, connectionManager, eventBus,
registry, contactId, transportProperties); registry, transportProperties);
} }
@Override @Override
@@ -44,25 +38,13 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
getPlugin().createReader(transportProperties); getPlugin().createReader(transportProperties);
if (r == null) { if (r == null) {
LOG.warning("Failed to create reader"); LOG.warning("Failed to create reader");
registry.removeReader(contactId, this); registry.removeReader(this);
setSuccess(false); setSuccess(false);
return; return;
} }
eventBus.addListener(this);
connectionManager.manageIncomingConnection(ID, new DecoratedReader(r)); connectionManager.manageIncomingConnection(ID, new DecoratedReader(r));
} }
@Override
public void eventOccurred(Event e) {
if (e instanceof MessageAddedEvent) {
MessageAddedEvent m = (MessageAddedEvent) e;
if (contactId.equals(m.getContactId())) {
LOG.info("Message received");
addDone(m.getMessage().getRawLength());
}
}
}
private class DecoratedReader implements TransportConnectionReader { private class DecoratedReader implements TransportConnectionReader {
private final TransportConnectionReader delegate; private final TransportConnectionReader delegate;
@@ -80,8 +62,7 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
public void dispose(boolean exception, boolean recognised) public void dispose(boolean exception, boolean recognised)
throws IOException { throws IOException {
delegate.dispose(exception, recognised); delegate.dispose(exception, recognised);
registry.removeReader(contactId, RemovableDriveReaderTask.this); registry.removeReader(RemovableDriveReaderTask.this);
eventBus.removeListener(RemovableDriveReaderTask.this);
setSuccess(!exception && recognised); setSuccess(!exception && recognised);
} }
} }

View File

@@ -9,7 +9,7 @@ import org.briarproject.bramble.api.properties.TransportProperties;
interface RemovableDriveTaskFactory { interface RemovableDriveTaskFactory {
RemovableDriveTask createReader(RemovableDriveTaskRegistry registry, RemovableDriveTask createReader(RemovableDriveTaskRegistry registry,
ContactId c, TransportProperties p); TransportProperties p);
RemovableDriveTask createWriter(RemovableDriveTaskRegistry registry, RemovableDriveTask createWriter(RemovableDriveTaskRegistry registry,
ContactId c, TransportProperties p); ContactId c, TransportProperties p);

View File

@@ -41,9 +41,9 @@ class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory {
@Override @Override
public RemovableDriveTask createReader(RemovableDriveTaskRegistry registry, public RemovableDriveTask createReader(RemovableDriveTaskRegistry registry,
ContactId c, TransportProperties p) { TransportProperties p) {
return new RemovableDriveReaderTask(eventExecutor, pluginManager, return new RemovableDriveReaderTask(eventExecutor, pluginManager,
connectionManager, eventBus, registry, c, p); connectionManager, eventBus, registry, p);
} }
@Override @Override

View File

@@ -2,7 +2,6 @@ package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.Consumer; import org.briarproject.bramble.api.Consumer;
import org.briarproject.bramble.api.connection.ConnectionManager; import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.PluginManager;
@@ -30,7 +29,6 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
final ConnectionManager connectionManager; final ConnectionManager connectionManager;
final EventBus eventBus; final EventBus eventBus;
final RemovableDriveTaskRegistry registry; final RemovableDriveTaskRegistry registry;
final ContactId contactId;
final TransportProperties transportProperties; final TransportProperties transportProperties;
private final Object lock = new Object(); private final Object lock = new Object();
@@ -45,14 +43,12 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
ConnectionManager connectionManager, ConnectionManager connectionManager,
EventBus eventBus, EventBus eventBus,
RemovableDriveTaskRegistry registry, RemovableDriveTaskRegistry registry,
ContactId contactId,
TransportProperties transportProperties) { TransportProperties transportProperties) {
this.eventExecutor = eventExecutor; this.eventExecutor = eventExecutor;
this.pluginManager = pluginManager; this.pluginManager = pluginManager;
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.eventBus = eventBus; this.eventBus = eventBus;
this.registry = registry; this.registry = registry;
this.contactId = contactId;
this.transportProperties = transportProperties; this.transportProperties = transportProperties;
} }
@@ -98,15 +94,15 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
done = min(state.getDone() + done, state.getTotal()); done = min(state.getDone() + done, state.getTotal());
state = new State(done, state.getTotal(), state.isFinished(), state = new State(done, state.getTotal(), state.isFinished(),
state.isSuccess()); state.isSuccess());
notifyObservers();
} }
notifyObservers();
} }
void setSuccess(boolean success) { void setSuccess(boolean success) {
synchronized (lock) { synchronized (lock) {
state = new State(state.getDone(), state.getTotal(), true, success); state = new State(state.getDone(), state.getTotal(), true, success);
notifyObservers();
} }
notifyObservers();
} }
@GuardedBy("lock") @GuardedBy("lock")

View File

@@ -1,13 +1,12 @@
package org.briarproject.bramble.plugin.file; package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask; import org.briarproject.bramble.api.plugin.file.RemovableDriveTask;
@NotNullByDefault @NotNullByDefault
interface RemovableDriveTaskRegistry { interface RemovableDriveTaskRegistry {
void removeReader(ContactId c, RemovableDriveTask task); void removeReader(RemovableDriveTask task);
void removeWriter(ContactId c, RemovableDriveTask task); void removeWriter(RemovableDriveTask task);
} }

View File

@@ -33,6 +33,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
getLogger(RemovableDriveWriterTask.class.getName()); getLogger(RemovableDriveWriterTask.class.getName());
private final DatabaseComponent db; private final DatabaseComponent db;
private final ContactId contactId;
RemovableDriveWriterTask( RemovableDriveWriterTask(
DatabaseComponent db, DatabaseComponent db,
@@ -44,8 +45,9 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
ContactId contactId, ContactId contactId,
TransportProperties transportProperties) { TransportProperties transportProperties) {
super(eventExecutor, pluginManager, connectionManager, eventBus, super(eventExecutor, pluginManager, connectionManager, eventBus,
registry, contactId, transportProperties); registry, transportProperties);
this.db = db; this.db = db;
this.contactId = contactId;
} }
@Override @Override
@@ -54,7 +56,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
TransportConnectionWriter w = plugin.createWriter(transportProperties); TransportConnectionWriter w = plugin.createWriter(transportProperties);
if (w == null) { if (w == null) {
LOG.warning("Failed to create writer"); LOG.warning("Failed to create writer");
registry.removeWriter(contactId, this); registry.removeWriter(this);
setSuccess(false); setSuccess(false);
return; return;
} }
@@ -64,7 +66,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
db.getMessageBytesToSend(txn, contactId, maxLatency))); db.getMessageBytesToSend(txn, contactId, maxLatency)));
} catch (DbException e) { } catch (DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
registry.removeWriter(contactId, this); registry.removeWriter(this);
setSuccess(false); setSuccess(false);
return; return;
} }
@@ -112,7 +114,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
@Override @Override
public void dispose(boolean exception) throws IOException { public void dispose(boolean exception) throws IOException {
delegate.dispose(exception); delegate.dispose(exception);
registry.removeWriter(contactId, RemovableDriveWriterTask.this); registry.removeWriter(RemovableDriveWriterTask.this);
eventBus.removeListener(RemovableDriveWriterTask.this); eventBus.removeListener(RemovableDriveWriterTask.this);
setSuccess(!exception); setSuccess(!exception);
} }

View File

@@ -70,9 +70,9 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
ContactId aliceId = setUp(bob, bobIdentity, ContactId aliceId = setUp(bob, bobIdentity,
aliceIdentity.getLocalAuthor(), false); aliceIdentity.getLocalAuthor(), false);
// Sync Alice's client versions and transport properties // Sync Alice's client versions and transport properties
read(bob, aliceId, write(alice, bobId), 2); read(bob, write(alice, bobId), 2);
// Sync Bob's client versions and transport properties // Sync Bob's client versions and transport properties
read(alice, bobId, write(bob, aliceId), 2); read(alice, write(bob, aliceId), 2);
} }
private ContactId setUp(RemovableDriveIntegrationTestComponent device, private ContactId setUp(RemovableDriveIntegrationTestComponent device,
@@ -92,7 +92,7 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
@SuppressWarnings("SameParameterValue") @SuppressWarnings("SameParameterValue")
private void read(RemovableDriveIntegrationTestComponent device, private void read(RemovableDriveIntegrationTestComponent device,
ContactId contactId, File file, int deliveries) throws Exception { File file, int deliveries) throws Exception {
// Listen for message deliveries // Listen for message deliveries
MessageDeliveryListener listener = MessageDeliveryListener listener =
new MessageDeliveryListener(deliveries); new MessageDeliveryListener(deliveries);
@@ -100,8 +100,8 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
// Read the incoming stream // Read the incoming stream
TransportProperties p = new TransportProperties(); TransportProperties p = new TransportProperties();
p.put(PROP_PATH, file.getAbsolutePath()); p.put(PROP_PATH, file.getAbsolutePath());
RemovableDriveTask reader = device.getRemovableDriveManager() RemovableDriveTask reader =
.startReaderTask(contactId, p); device.getRemovableDriveManager().startReaderTask(p);
CountDownLatch disposedLatch = new CountDownLatch(1); CountDownLatch disposedLatch = new CountDownLatch(1);
reader.addObserver(state -> { reader.addObserver(state -> {
if (state.isFinished()) disposedLatch.countDown(); if (state.isFinished()) disposedLatch.countDown();

View File

@@ -59,15 +59,15 @@ class RemovableDriveViewModel extends AndroidViewModel {
return observe(manager.startWriterTask(contactId, p)); return observe(manager.startWriterTask(contactId, p));
} }
LiveData<State> read(ContactId contactId, Uri uri) { LiveData<State> read(Uri uri) {
TransportProperties p = new TransportProperties(); TransportProperties p = new TransportProperties();
p.put(PROP_URI, uri.toString()); p.put(PROP_URI, uri.toString());
return observe(manager.startReaderTask(contactId, p)); return observe(manager.startReaderTask(p));
} }
@Nullable @Nullable
LiveData<State> ongoingWrite(ContactId contactId) { LiveData<State> ongoingWrite() {
RemovableDriveTask task = manager.getCurrentWriterTask(contactId); RemovableDriveTask task = manager.getCurrentWriterTask();
if (task == null) { if (task == null) {
return null; return null;
} }
@@ -75,8 +75,8 @@ class RemovableDriveViewModel extends AndroidViewModel {
} }
@Nullable @Nullable
LiveData<State> ongoingRead(ContactId contactId) { LiveData<State> ongoingRead() {
RemovableDriveTask task = manager.getCurrentReaderTask(contactId); RemovableDriveTask task = manager.getCurrentReaderTask();
if (task == null) { if (task == null) {
return null; return null;
} }