Ensure that observers see the final state even if they're added late.

This commit is contained in:
akwizgran
2021-05-07 16:30:53 +01:00
parent bca6f1506e
commit a198e7d08e
5 changed files with 94 additions and 53 deletions

View File

@@ -1,6 +1,6 @@
package org.briarproject.bramble.api.plugin.file;
import org.briarproject.bramble.api.event.EventExecutor;
import org.briarproject.bramble.api.Consumer;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.io.File;
@@ -14,21 +14,43 @@ public interface RemovableDriveTask extends Runnable {
File getFile();
/**
* Adds an observer to the task.
* Adds an observer to the task. The observer will be notified of state
* changes on the event thread. If the task has already finished, the
* observer will be notified of its final state.
*/
void addObserver(Observer o);
void addObserver(Consumer<State> observer);
/**
* Removes an observer from the task.
*/
void removeObserver(Observer o);
void removeObserver(Consumer<State> observer);
interface Observer {
class State {
@EventExecutor
void onProgress(long done, long total);
private final long done, total;
private final boolean finished, success;
@EventExecutor
void onCompletion(boolean success);
public State(long done, long total, boolean finished, boolean success) {
this.done = done;
this.total = total;
this.finished = finished;
this.success = success;
}
public long getDone() {
return done;
}
public long getTotal() {
return total;
}
public boolean isFinished() {
return finished;
}
public boolean isSuccess() {
return success;
}
}
}

View File

@@ -45,10 +45,10 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
if (r == null) {
LOG.warning("Failed to create reader");
registry.removeReader(contactId, this);
visitObservers(o -> o.onCompletion(false));
setSuccess(false);
return;
}
progressTotal.set(file.length());
setTotal(file.length());
eventBus.addListener(this);
connectionManager.manageIncomingConnection(ID, new DecoratedReader(r));
}
@@ -59,7 +59,7 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
MessageAddedEvent m = (MessageAddedEvent) e;
if (contactId.equals(m.getContactId())) {
LOG.info("Message received");
updateProgress(m.getMessage().getRawLength());
addDone(m.getMessage().getRawLength());
}
}
}
@@ -83,7 +83,7 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
delegate.dispose(exception, recognised);
registry.removeReader(contactId, RemovableDriveReaderTask.this);
eventBus.removeListener(RemovableDriveReaderTask.this);
visitObservers(o -> o.onCompletion(!exception && recognised));
setSuccess(!exception && recognised);
}
}
}

View File

@@ -11,11 +11,11 @@ import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
import org.briarproject.bramble.api.properties.TransportProperties;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import static java.lang.Math.min;
@@ -34,9 +34,12 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
final RemovableDriveTaskRegistry registry;
final ContactId contactId;
final File file;
private final List<Observer> observers = new CopyOnWriteArrayList<>();
final AtomicLong progressTotal = new AtomicLong(0);
private final AtomicLong progressDone = new AtomicLong(0);
private final Object lock = new Object();
@GuardedBy("lock")
private final List<Consumer<State>> observers = new ArrayList<>();
@GuardedBy("lock")
private State state = new State(0, 0, false, false);
RemovableDriveTaskImpl(
Executor eventExecutor,
@@ -61,19 +64,22 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
}
@Override
public void addObserver(Observer o) {
observers.add(o);
public void addObserver(Consumer<State> o) {
State state;
synchronized (lock) {
observers.add(o);
state = this.state;
}
if (state.isFinished()) {
eventExecutor.execute(() -> o.accept(state));
}
}
@Override
public void removeObserver(Observer o) {
observers.remove(o);
}
void visitObservers(Consumer<Observer> visitor) {
eventExecutor.execute(() -> {
for (Observer o : observers) visitor.accept(o);
});
public void removeObserver(Consumer<State> o) {
synchronized (lock) {
observers.remove(o);
}
}
SimplexPlugin getPlugin() {
@@ -86,9 +92,37 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
return p;
}
void updateProgress(long progress) {
long done = progressDone.addAndGet(progress);
long total = progressTotal.get();
visitObservers(o -> o.onProgress(min(done, total), total));
void setTotal(long total) {
synchronized (lock) {
state = new State(state.getDone(), total, state.isFinished(),
state.isSuccess());
notifyObservers();
}
}
void addDone(long done) {
synchronized (lock) {
// Done and total come from different sources; make them consistent
done = min(state.getDone() + done, state.getTotal());
state = new State(done, state.getTotal(), state.isFinished(),
state.isSuccess());
}
notifyObservers();
}
void setSuccess(boolean success) {
synchronized (lock) {
state = new State(state.getDone(), state.getTotal(), true, success);
}
notifyObservers();
}
@GuardedBy("lock")
private void notifyObservers() {
List<Consumer<State>> observers = new ArrayList<>(this.observers);
State state = this.state;
eventExecutor.execute(() -> {
for (Consumer<State> o : observers) o.accept(state);
});
}
}

View File

@@ -51,7 +51,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
if (w == null) {
LOG.warning("Failed to create writer");
registry.removeWriter(contactId, this);
visitObservers(o -> o.onCompletion(false));
setSuccess(false);
return;
}
// TODO: Get total bytes to send from DB
@@ -101,7 +101,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
delegate.dispose(exception);
registry.removeWriter(contactId, RemovableDriveWriterTask.this);
eventBus.removeListener(RemovableDriveWriterTask.this);
visitObservers(o -> o.onCompletion(!exception));
setSuccess(!exception);
}
}
}

View File

@@ -11,7 +11,6 @@ import org.briarproject.bramble.api.identity.IdentityManager;
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask.Observer;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.test.BrambleTestCase;
import org.briarproject.bramble.test.TestDatabaseConfigModule;
@@ -100,15 +99,8 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
RemovableDriveTask reader = device.getRemovableDriveManager()
.startReaderTask(contactId, file);
CountDownLatch disposedLatch = new CountDownLatch(1);
reader.addObserver(new Observer() {
@Override
public void onProgress(long done, long total) {
}
@Override
public void onCompletion(boolean success) {
disposedLatch.countDown();
}
reader.addObserver(state -> {
if (state.isFinished()) disposedLatch.countDown();
});
// Wait for the messages to be delivered
assertTrue(listener.delivered.await(TIMEOUT_MS, MILLISECONDS));
@@ -125,15 +117,8 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
RemovableDriveTask writer = device.getRemovableDriveManager()
.startWriterTask(contactId, file);
CountDownLatch disposedLatch = new CountDownLatch(1);
writer.addObserver(new Observer() {
@Override
public void onProgress(long done, long total) {
}
@Override
public void onCompletion(boolean success) {
disposedLatch.countDown();
}
writer.addObserver(state -> {
if (state.isFinished()) disposedLatch.countDown();
});
// Wait for the writer to be disposed
disposedLatch.await(TIMEOUT_MS, MILLISECONDS);