From a198e7d08eadd8ddb9bfa63afa542f7bbb4a24a1 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Fri, 7 May 2021 16:30:53 +0100 Subject: [PATCH] Ensure that observers see the final state even if they're added late. --- .../api/plugin/file/RemovableDriveTask.java | 40 ++++++++--- .../plugin/file/RemovableDriveReaderTask.java | 8 +-- .../plugin/file/RemovableDriveTaskImpl.java | 72 ++++++++++++++----- .../plugin/file/RemovableDriveWriterTask.java | 4 +- .../file/RemovableDriveIntegrationTest.java | 23 ++---- 5 files changed, 94 insertions(+), 53 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/file/RemovableDriveTask.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/file/RemovableDriveTask.java index c4f2e00a9..5388dab31 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/file/RemovableDriveTask.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/file/RemovableDriveTask.java @@ -1,6 +1,6 @@ package org.briarproject.bramble.api.plugin.file; -import org.briarproject.bramble.api.event.EventExecutor; +import org.briarproject.bramble.api.Consumer; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import java.io.File; @@ -14,21 +14,43 @@ public interface RemovableDriveTask extends Runnable { File getFile(); /** - * Adds an observer to the task. + * Adds an observer to the task. The observer will be notified of state + * changes on the event thread. If the task has already finished, the + * observer will be notified of its final state. */ - void addObserver(Observer o); + void addObserver(Consumer observer); /** * Removes an observer from the task. */ - void removeObserver(Observer o); + void removeObserver(Consumer observer); - interface Observer { + class State { - @EventExecutor - void onProgress(long done, long total); + private final long done, total; + private final boolean finished, success; - @EventExecutor - void onCompletion(boolean success); + public State(long done, long total, boolean finished, boolean success) { + this.done = done; + this.total = total; + this.finished = finished; + this.success = success; + } + + public long getDone() { + return done; + } + + public long getTotal() { + return total; + } + + public boolean isFinished() { + return finished; + } + + public boolean isSuccess() { + return success; + } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveReaderTask.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveReaderTask.java index e51743332..cc93c9099 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveReaderTask.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveReaderTask.java @@ -45,10 +45,10 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl if (r == null) { LOG.warning("Failed to create reader"); registry.removeReader(contactId, this); - visitObservers(o -> o.onCompletion(false)); + setSuccess(false); return; } - progressTotal.set(file.length()); + setTotal(file.length()); eventBus.addListener(this); connectionManager.manageIncomingConnection(ID, new DecoratedReader(r)); } @@ -59,7 +59,7 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl MessageAddedEvent m = (MessageAddedEvent) e; if (contactId.equals(m.getContactId())) { LOG.info("Message received"); - updateProgress(m.getMessage().getRawLength()); + addDone(m.getMessage().getRawLength()); } } } @@ -83,7 +83,7 @@ class RemovableDriveReaderTask extends RemovableDriveTaskImpl delegate.dispose(exception, recognised); registry.removeReader(contactId, RemovableDriveReaderTask.this); eventBus.removeListener(RemovableDriveReaderTask.this); - visitObservers(o -> o.onCompletion(!exception && recognised)); + setSuccess(!exception && recognised); } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java index f7a919ed7..de792f644 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveTaskImpl.java @@ -11,11 +11,11 @@ import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; import org.briarproject.bramble.api.properties.TransportProperties; import java.io.File; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import static java.lang.Math.min; @@ -34,9 +34,12 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask { final RemovableDriveTaskRegistry registry; final ContactId contactId; final File file; - private final List observers = new CopyOnWriteArrayList<>(); - final AtomicLong progressTotal = new AtomicLong(0); - private final AtomicLong progressDone = new AtomicLong(0); + + private final Object lock = new Object(); + @GuardedBy("lock") + private final List> observers = new ArrayList<>(); + @GuardedBy("lock") + private State state = new State(0, 0, false, false); RemovableDriveTaskImpl( Executor eventExecutor, @@ -61,19 +64,22 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask { } @Override - public void addObserver(Observer o) { - observers.add(o); + public void addObserver(Consumer o) { + State state; + synchronized (lock) { + observers.add(o); + state = this.state; + } + if (state.isFinished()) { + eventExecutor.execute(() -> o.accept(state)); + } } @Override - public void removeObserver(Observer o) { - observers.remove(o); - } - - void visitObservers(Consumer visitor) { - eventExecutor.execute(() -> { - for (Observer o : observers) visitor.accept(o); - }); + public void removeObserver(Consumer o) { + synchronized (lock) { + observers.remove(o); + } } SimplexPlugin getPlugin() { @@ -86,9 +92,37 @@ abstract class RemovableDriveTaskImpl implements RemovableDriveTask { return p; } - void updateProgress(long progress) { - long done = progressDone.addAndGet(progress); - long total = progressTotal.get(); - visitObservers(o -> o.onProgress(min(done, total), total)); + void setTotal(long total) { + synchronized (lock) { + state = new State(state.getDone(), total, state.isFinished(), + state.isSuccess()); + notifyObservers(); + } + } + + void addDone(long done) { + synchronized (lock) { + // Done and total come from different sources; make them consistent + done = min(state.getDone() + done, state.getTotal()); + state = new State(done, state.getTotal(), state.isFinished(), + state.isSuccess()); + } + notifyObservers(); + } + + void setSuccess(boolean success) { + synchronized (lock) { + state = new State(state.getDone(), state.getTotal(), true, success); + } + notifyObservers(); + } + + @GuardedBy("lock") + private void notifyObservers() { + List> observers = new ArrayList<>(this.observers); + State state = this.state; + eventExecutor.execute(() -> { + for (Consumer o : observers) o.accept(state); + }); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java index 671921607..ec02e417b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java @@ -51,7 +51,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl if (w == null) { LOG.warning("Failed to create writer"); registry.removeWriter(contactId, this); - visitObservers(o -> o.onCompletion(false)); + setSuccess(false); return; } // TODO: Get total bytes to send from DB @@ -101,7 +101,7 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl delegate.dispose(exception); registry.removeWriter(contactId, RemovableDriveWriterTask.this); eventBus.removeListener(RemovableDriveWriterTask.this); - visitObservers(o -> o.onCompletion(!exception)); + setSuccess(!exception); } } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/plugin/file/RemovableDriveIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/plugin/file/RemovableDriveIntegrationTest.java index acd4844dc..56a40bcbe 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/plugin/file/RemovableDriveIntegrationTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/plugin/file/RemovableDriveIntegrationTest.java @@ -11,7 +11,6 @@ import org.briarproject.bramble.api.identity.IdentityManager; import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.file.RemovableDriveTask; -import org.briarproject.bramble.api.plugin.file.RemovableDriveTask.Observer; import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent; import org.briarproject.bramble.test.BrambleTestCase; import org.briarproject.bramble.test.TestDatabaseConfigModule; @@ -100,15 +99,8 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase { RemovableDriveTask reader = device.getRemovableDriveManager() .startReaderTask(contactId, file); CountDownLatch disposedLatch = new CountDownLatch(1); - reader.addObserver(new Observer() { - @Override - public void onProgress(long done, long total) { - } - - @Override - public void onCompletion(boolean success) { - disposedLatch.countDown(); - } + reader.addObserver(state -> { + if (state.isFinished()) disposedLatch.countDown(); }); // Wait for the messages to be delivered assertTrue(listener.delivered.await(TIMEOUT_MS, MILLISECONDS)); @@ -125,15 +117,8 @@ public class RemovableDriveIntegrationTest extends BrambleTestCase { RemovableDriveTask writer = device.getRemovableDriveManager() .startWriterTask(contactId, file); CountDownLatch disposedLatch = new CountDownLatch(1); - writer.addObserver(new Observer() { - @Override - public void onProgress(long done, long total) { - } - - @Override - public void onCompletion(boolean success) { - disposedLatch.countDown(); - } + writer.addObserver(state -> { + if (state.isFinished()) disposedLatch.countDown(); }); // Wait for the writer to be disposed disposedLatch.await(TIMEOUT_MS, MILLISECONDS);