Ensure that observers see the final state even if they're added late.

This commit is contained in:
akwizgran
2021-05-07 16:30:53 +01:00
committed by Torsten Grote
parent c010dd9401
commit dc2ad48a7f
5 changed files with 94 additions and 53 deletions

View File

@@ -1,6 +1,6 @@
package org.briarproject.bramble.api.plugin.file; package org.briarproject.bramble.api.plugin.file;
import org.briarproject.bramble.api.event.EventExecutor; import org.briarproject.bramble.api.Consumer;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.io.File; import java.io.File;
@@ -14,21 +14,43 @@ public interface RemovableDriveTask extends Runnable {
File getFile(); File getFile();
/** /**
* Adds an observer to the task. * Adds an observer to the task. The observer will be notified of state
* changes on the event thread. If the task has already finished, the
* observer will be notified of its final state.
*/ */
void addObserver(Observer o); void addObserver(Consumer<State> observer);
/** /**
* Removes an observer from the task. * Removes an observer from the task.
*/ */
void removeObserver(Observer o); void removeObserver(Consumer<State> observer);
interface Observer { class State {
@EventExecutor private final long done, total;
void onProgress(long done, long total); private final boolean finished, success;
@EventExecutor public State(long done, long total, boolean finished, boolean success) {
void onCompletion(boolean success); this.done = done;
this.total = total;
this.finished = finished;
this.success = success;
}
public long getDone() {
return done;
}
public long getTotal() {
return total;
}
public boolean isFinished() {
return finished;
}
public boolean isSuccess() {
return success;
}
} }
} }

View File

@@ -45,10 +45,10 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
if (r == null) { if (r == null) {
LOG.warning("Failed to create reader"); LOG.warning("Failed to create reader");
registry.removeReader(contactId, this); registry.removeReader(contactId, this);
visitObservers(o -> o.onCompletion(false)); setSuccess(false);
return; return;
} }
progressTotal.set(file.length()); setTotal(file.length());
eventBus.addListener(this); eventBus.addListener(this);
connectionManager.manageIncomingConnection(ID, new DecoratedReader(r)); connectionManager.manageIncomingConnection(ID, new DecoratedReader(r));
} }
@@ -59,7 +59,7 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
MessageAddedEvent m = (MessageAddedEvent) e; MessageAddedEvent m = (MessageAddedEvent) e;
if (contactId.equals(m.getContactId())) { if (contactId.equals(m.getContactId())) {
LOG.info("Message received"); LOG.info("Message received");
updateProgress(m.getMessage().getRawLength()); addDone(m.getMessage().getRawLength());
} }
} }
} }
@@ -83,7 +83,7 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl
delegate.dispose(exception, recognised); delegate.dispose(exception, recognised);
registry.removeReader(contactId, RemovableDriveReaderTask.this); registry.removeReader(contactId, RemovableDriveReaderTask.this);
eventBus.removeListener(RemovableDriveReaderTask.this); eventBus.removeListener(RemovableDriveReaderTask.this);
visitObservers(o -> o.onCompletion(!exception && recognised)); setSuccess(!exception && recognised);
} }
} }
} }

View File

@@ -11,11 +11,11 @@ import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
import org.briarproject.bramble.api.properties.TransportProperties; import org.briarproject.bramble.api.properties.TransportProperties;
import java.io.File; import java.io.File;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import static java.lang.Math.min; import static java.lang.Math.min;
@@ -34,9 +34,12 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
final RemovableDriveTaskRegistry registry; final RemovableDriveTaskRegistry registry;
final ContactId contactId; final ContactId contactId;
final File file; final File file;
private final List<Observer> observers = new CopyOnWriteArrayList<>();
final AtomicLong progressTotal = new AtomicLong(0); private final Object lock = new Object();
private final AtomicLong progressDone = new AtomicLong(0); @GuardedBy("lock")
private final List<Consumer<State>> observers = new ArrayList<>();
@GuardedBy("lock")
private State state = new State(0, 0, false, false);
RemovableDriveTaskImpl( RemovableDriveTaskImpl(
Executor eventExecutor, Executor eventExecutor,
@@ -61,19 +64,22 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
} }
@Override @Override
public void addObserver(Observer o) { public void addObserver(Consumer<State> o) {
observers.add(o); State state;
synchronized (lock) {
observers.add(o);
state = this.state;
}
if (state.isFinished()) {
eventExecutor.execute(() -> o.accept(state));
}
} }
@Override @Override
public void removeObserver(Observer o) { public void removeObserver(Consumer<State> o) {
observers.remove(o); synchronized (lock) {
} observers.remove(o);
}
void visitObservers(Consumer<Observer> visitor) {
eventExecutor.execute(() -> {
for (Observer o : observers) visitor.accept(o);
});
} }
SimplexPlugin getPlugin() { SimplexPlugin getPlugin() {
@@ -86,9 +92,37 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask {
return p; return p;
} }
void updateProgress(long progress) { void setTotal(long total) {
long done = progressDone.addAndGet(progress); synchronized (lock) {
long total = progressTotal.get(); state = new State(state.getDone(), total, state.isFinished(),
visitObservers(o -> o.onProgress(min(done, total), total)); state.isSuccess());
notifyObservers();
}
}
void addDone(long done) {
synchronized (lock) {
// Done and total come from different sources; make them consistent
done = min(state.getDone() + done, state.getTotal());
state = new State(done, state.getTotal(), state.isFinished(),
state.isSuccess());
}
notifyObservers();
}
void setSuccess(boolean success) {
synchronized (lock) {
state = new State(state.getDone(), state.getTotal(), true, success);
}
notifyObservers();
}
@GuardedBy("lock")
private void notifyObservers() {
List<Consumer<State>> observers = new ArrayList<>(this.observers);
State state = this.state;
eventExecutor.execute(() -> {
for (Consumer<State> o : observers) o.accept(state);
});
} }
} }

View File

@@ -51,7 +51,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
if (w == null) { if (w == null) {
LOG.warning("Failed to create writer"); LOG.warning("Failed to create writer");
registry.removeWriter(contactId, this); registry.removeWriter(contactId, this);
visitObservers(o -> o.onCompletion(false)); setSuccess(false);
return; return;
} }
// TODO: Get total bytes to send from DB // TODO: Get total bytes to send from DB
@@ -101,7 +101,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
delegate.dispose(exception); delegate.dispose(exception);
registry.removeWriter(contactId, RemovableDriveWriterTask.this); registry.removeWriter(contactId, RemovableDriveWriterTask.this);
eventBus.removeListener(RemovableDriveWriterTask.this); eventBus.removeListener(RemovableDriveWriterTask.this);
visitObservers(o -> o.onCompletion(!exception)); setSuccess(!exception);
} }
} }
} }

View File

@@ -11,7 +11,6 @@ import org.briarproject.bramble.api.identity.IdentityManager;
import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask; import org.briarproject.bramble.api.plugin.file.RemovableDriveTask;
import org.briarproject.bramble.api.plugin.file.RemovableDriveTask.Observer;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent; import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.test.BrambleTestCase; import org.briarproject.bramble.test.BrambleTestCase;
import org.briarproject.bramble.test.TestDatabaseConfigModule; import org.briarproject.bramble.test.TestDatabaseConfigModule;
@@ -100,15 +99,8 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
RemovableDriveTask reader = device.getRemovableDriveManager() RemovableDriveTask reader = device.getRemovableDriveManager()
.startReaderTask(contactId, file); .startReaderTask(contactId, file);
CountDownLatch disposedLatch = new CountDownLatch(1); CountDownLatch disposedLatch = new CountDownLatch(1);
reader.addObserver(new Observer() { reader.addObserver(state -> {
@Override if (state.isFinished()) disposedLatch.countDown();
public void onProgress(long done, long total) {
}
@Override
public void onCompletion(boolean success) {
disposedLatch.countDown();
}
}); });
// Wait for the messages to be delivered // Wait for the messages to be delivered
assertTrue(listener.delivered.await(TIMEOUT_MS, MILLISECONDS)); assertTrue(listener.delivered.await(TIMEOUT_MS, MILLISECONDS));
@@ -125,15 +117,8 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase {
RemovableDriveTask writer = device.getRemovableDriveManager() RemovableDriveTask writer = device.getRemovableDriveManager()
.startWriterTask(contactId, file); .startWriterTask(contactId, file);
CountDownLatch disposedLatch = new CountDownLatch(1); CountDownLatch disposedLatch = new CountDownLatch(1);
writer.addObserver(new Observer() { writer.addObserver(state -> {
@Override if (state.isFinished()) disposedLatch.countDown();
public void onProgress(long done, long total) {
}
@Override
public void onCompletion(boolean success) {
disposedLatch.countDown();
}
}); });
// Wait for the writer to be disposed // Wait for the writer to be disposed
disposedLatch.await(TIMEOUT_MS, MILLISECONDS); disposedLatch.await(TIMEOUT_MS, MILLISECONDS);