Refactor removable drive tasks.

This commit is contained in:
akwizgran
2021-06-15 11:44:10 +01:00
committed by Torsten Grote
parent d7238312b1
commit 5e98bd0b53
11 changed files with 78 additions and 94 deletions

View File

@@ -10,32 +10,31 @@ import javax.annotation.Nullable;
public interface RemovableDriveManager {
/**
* Returns the currently running reader task for the given contact,
* or null if no task is running.
* Returns the currently running reader task, or null if no reader task
* is running.
*/
@Nullable
RemovableDriveTask getCurrentReaderTask(ContactId c);
RemovableDriveTask getCurrentReaderTask();
/**
* Returns the currently running writer task for the given contact,
* or null if no task is running.
* Returns the currently running writer task, or null if no writer task
* is running.
*/
@Nullable
RemovableDriveTask getCurrentWriterTask(ContactId c);
RemovableDriveTask getCurrentWriterTask();
/**
* Starts and returns a reader task for the given contact, reading from
* a stream described by the given transport properties. If a reader task
* for the contact is already running, it will be returned and the
* transport properties will be ignored.
* Starts and returns a reader task, reading from a stream described by
* the given transport properties. If a reader task is already running,
* it will be returned and the argument will be ignored.
*/
RemovableDriveTask startReaderTask(ContactId c, TransportProperties p);
RemovableDriveTask startReaderTask(TransportProperties p);
/**
* Starts and returns a writer task for the given contact, writing to
* a stream described by the given transport properties. If a writer task
* for the contact is already running, it will be returned and the
* transport properties will be ignored.
* is already running, it will be returned and the arguments will be
* ignored.
*/
RemovableDriveTask startWriterTask(ContactId c, TransportProperties p);
}

View File

@@ -39,7 +39,7 @@ public interface RemovableDriveTask extends Runnable {
/**
* Returns the total length in bytes of the messages read or written
* so far.
* so far, or zero if the total is unknown.
*/
public long getDone() {
return done;

View File

@@ -7,10 +7,10 @@ import org.briarproject.bramble.api.plugin.file.RemovableDriveManager;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask;
import org.briarproject.bramble.api.properties.TransportProperties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
@@ -21,10 +21,12 @@ class RemovableDriveManagerImpl
private final Executor ioExecutor;
private final RemovableDriveTaskFactory taskFactory;
private final ConcurrentHashMap<ContactId, RemovableDriveTask>
readers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<ContactId, RemovableDriveTask>
writers = new ConcurrentHashMap<>();
private final Object lock = new Object();
@GuardedBy("lock")
private RemovableDriveTask reader = null;
@GuardedBy("lock")
private RemovableDriveTask writer = null;
@Inject
RemovableDriveManagerImpl(@IoExecutor Executor ioExecutor,
@@ -35,49 +37,54 @@ class RemovableDriveManagerImpl
@Nullable
@Override
public RemovableDriveTask getCurrentReaderTask(ContactId c) {
return readers.get(c);
public RemovableDriveTask getCurrentReaderTask() {
synchronized (lock) {
return reader;
}
}
@Nullable
@Override
public RemovableDriveTask getCurrentWriterTask(ContactId c) {
return writers.get(c);
public RemovableDriveTask getCurrentWriterTask() {
synchronized (lock) {
return writer;
}
}
@Override
public RemovableDriveTask startReaderTask(ContactId c,
TransportProperties p) {
RemovableDriveTask task = taskFactory.createReader(this, c, p);
RemovableDriveTask old = readers.putIfAbsent(c, task);
if (old == null) {
ioExecutor.execute(task);
return task;
} else {
return old;
public RemovableDriveTask startReaderTask(TransportProperties p) {
RemovableDriveTask created;
synchronized (lock) {
if (reader != null) return reader;
reader = created = taskFactory.createReader(this, p);
}
ioExecutor.execute(created);
return created;
}
@Override
public RemovableDriveTask startWriterTask(ContactId c,
TransportProperties p) {
RemovableDriveTask task = taskFactory.createWriter(this, c, p);
RemovableDriveTask old = writers.putIfAbsent(c, task);
if (old == null) {
ioExecutor.execute(task);
return task;
} else {
return old;
RemovableDriveTask created;
synchronized (lock) {
if (writer != null) return writer;
writer = created = taskFactory.createWriter(this, c, p);
}
ioExecutor.execute(created);
return created;
}
@Override
public void removeReader(RemovableDriveTask task) {
synchronized (lock) {
if (reader == task) reader = null;
}
}
@Override
public void removeReader(ContactId c, RemovableDriveTask task) {
readers.remove(c, task);
}
@Override
public void removeWriter(ContactId c, RemovableDriveTask task) {
writers.remove(c, task);
public void removeWriter(RemovableDriveTask task) {
synchronized (lock) {
if (writer == task) writer = null;
}
}
}

View File

@@ -1,15 +1,11 @@
package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
import java.io.IOException;
import java.io.InputStream;
@@ -20,8 +16,7 @@ import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.plugin.file.RemovableDriveConstants.ID;
@NotNullByDefault
class RemovableDriveReaderTask extends RemovableDriveTaskImpl
implements EventListener {
class RemovableDriveReaderTask extends RemovableDriveTaskImpl {
private final static Logger LOG =
getLogger(RemovableDriveReaderTask.class.getName());
@@ -32,10 +27,9 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
ConnectionManager connectionManager,
EventBus eventBus,
RemovableDriveTaskRegistry registry,
ContactId contactId,
TransportProperties transportProperties) {
super(eventExecutor, pluginManager, connectionManager, eventBus,
registry, contactId, transportProperties);
registry, transportProperties);
}
@Override
@@ -44,25 +38,13 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
getPlugin().createReader(transportProperties);
if (r == null) {
LOG.warning("Failed to create reader");
registry.removeReader(contactId, this);
registry.removeReader(this);
setSuccess(false);
return;
}
eventBus.addListener(this);
connectionManager.manageIncomingConnection(ID, new DecoratedReader(r));
}
@Override
public void eventOccurred(Event e) {
if (e instanceof MessageAddedEvent) {
MessageAddedEvent m = (MessageAddedEvent) e;
if (contactId.equals(m.getContactId())) {
LOG.info("Message received");
addDone(m.getMessage().getRawLength());
}
}
}
private class DecoratedReader implements TransportConnectionReader {
private final TransportConnectionReader delegate;
@@ -80,8 +62,7 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
public void dispose(boolean exception, boolean recognised)
throws IOException {
delegate.dispose(exception, recognised);
registry.removeReader(contactId, RemovableDriveReaderTask.this);
eventBus.removeListener(RemovableDriveReaderTask.this);
registry.removeReader(RemovableDriveReaderTask.this);
setSuccess(!exception && recognised);
}
}

View File

@@ -9,7 +9,7 @@ import org.briarproject.bramble.api.properties.TransportProperties;
interface RemovableDriveTaskFactory {
RemovableDriveTask createReader(RemovableDriveTaskRegistry registry,
ContactId c, TransportProperties p);
TransportProperties p);
RemovableDriveTask createWriter(RemovableDriveTaskRegistry registry,
ContactId c, TransportProperties p);

View File

@@ -41,9 +41,9 @@ class RemovableDriveTaskFactoryImpl implements RemovableDriveTaskFactory {
@Override
public RemovableDriveTask createReader(RemovableDriveTaskRegistry registry,
ContactId c, TransportProperties p) {
TransportProperties p) {
return new RemovableDriveReaderTask(eventExecutor, pluginManager,
connectionManager, eventBus, registry, c, p);
connectionManager, eventBus, registry, p);
}
@Override

View File

@@ -2,7 +2,6 @@ package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.Consumer;
import org.briarproject.bramble.api.connection.ConnectionManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.PluginManager;
@@ -30,7 +29,6 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
final ConnectionManager connectionManager;
final EventBus eventBus;
final RemovableDriveTaskRegistry registry;
final ContactId contactId;
final TransportProperties transportProperties;
private final Object lock = new Object();
@@ -45,14 +43,12 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
ConnectionManager connectionManager,
EventBus eventBus,
RemovableDriveTaskRegistry registry,
ContactId contactId,
TransportProperties transportProperties) {
this.eventExecutor = eventExecutor;
this.pluginManager = pluginManager;
this.connectionManager = connectionManager;
this.eventBus = eventBus;
this.registry = registry;
this.contactId = contactId;
this.transportProperties = transportProperties;
}

View File

@@ -1,13 +1,12 @@
package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask;
@NotNullByDefault
interface RemovableDriveTaskRegistry {
void removeReader(ContactId c, RemovableDriveTask task);
void removeReader(RemovableDriveTask task);
void removeWriter(ContactId c, RemovableDriveTask task);
void removeWriter(RemovableDriveTask task);
}

View File

@@ -33,6 +33,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
getLogger(RemovableDriveWriterTask.class.getName());
private final DatabaseComponent db;
private final ContactId contactId;
RemovableDriveWriterTask(
DatabaseComponent db,
@@ -44,8 +45,9 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
ContactId contactId,
TransportProperties transportProperties) {
super(eventExecutor, pluginManager, connectionManager, eventBus,
registry, contactId, transportProperties);
registry, transportProperties);
this.db = db;
this.contactId = contactId;
}
@Override
@@ -54,7 +56,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
TransportConnectionWriter w = plugin.createWriter(transportProperties);
if (w == null) {
LOG.warning("Failed to create writer");
registry.removeWriter(contactId, this);
registry.removeWriter(this);
setSuccess(false);
return;
}
@@ -64,7 +66,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
db.getMessageBytesToSend(txn, contactId, maxLatency)));
} catch (DbException e) {
logException(LOG, WARNING, e);
registry.removeWriter(contactId, this);
registry.removeWriter(this);
setSuccess(false);
return;
}
@@ -112,7 +114,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
@Override
public void dispose(boolean exception) throws IOException {
delegate.dispose(exception);
registry.removeWriter(contactId, RemovableDriveWriterTask.this);
registry.removeWriter(RemovableDriveWriterTask.this);
eventBus.removeListener(RemovableDriveWriterTask.this);
setSuccess(!exception);
}

View File

@@ -70,9 +70,9 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
ContactId aliceId = setUp(bob, bobIdentity,
aliceIdentity.getLocalAuthor(), false);
// Sync Alice's client versions and transport properties
read(bob, aliceId, write(alice, bobId), 2);
read(bob, write(alice, bobId), 2);
// Sync Bob's client versions and transport properties
read(alice, bobId, write(bob, aliceId), 2);
read(alice, write(bob, aliceId), 2);
}
private ContactId setUp(RemovableDriveIntegrationTestComponent device,
@@ -92,7 +92,7 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
@SuppressWarnings("SameParameterValue")
private void read(RemovableDriveIntegrationTestComponent device,
ContactId contactId, File file, int deliveries) throws Exception {
File file, int deliveries) throws Exception {
// Listen for message deliveries
MessageDeliveryListener listener =
new MessageDeliveryListener(deliveries);
@@ -100,8 +100,8 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
// Read the incoming stream
TransportProperties p = new TransportProperties();
p.put(PROP_PATH, file.getAbsolutePath());
RemovableDriveTask reader = device.getRemovableDriveManager()
.startReaderTask(contactId, p);
RemovableDriveTask reader =
device.getRemovableDriveManager().startReaderTask(p);
CountDownLatch disposedLatch = new CountDownLatch(1);
reader.addObserver(state -> {
if (state.isFinished()) disposedLatch.countDown();

View File

@@ -59,15 +59,15 @@ class RemovableDriveViewModel extends AndroidViewModel {
return observe(manager.startWriterTask(contactId, p));
}
LiveData<State> read(ContactId contactId, Uri uri) {
LiveData<State> read(Uri uri) {
TransportProperties p = new TransportProperties();
p.put(PROP_URI, uri.toString());
return observe(manager.startReaderTask(contactId, p));
return observe(manager.startReaderTask(p));
}
@Nullable
LiveData<State> ongoingWrite(ContactId contactId) {
RemovableDriveTask task = manager.getCurrentWriterTask(contactId);
LiveData<State> ongoingWrite() {
RemovableDriveTask task = manager.getCurrentWriterTask();
if (task == null) {
return null;
}
@@ -75,8 +75,8 @@ class RemovableDriveViewModel extends AndroidViewModel {
}
@Nullable
LiveData<State> ongoingRead(ContactId contactId) {
RemovableDriveTask task = manager.getCurrentReaderTask(contactId);
LiveData<State> ongoingRead() {
RemovableDriveTask task = manager.getCurrentReaderTask();
if (task == null) {
return null;
}