Don't create files for upload while directly connected to contact.

This commit is contained in:
akwizgran
2022-08-10 12:37:38 +01:00
parent a1f25c8101
commit 24d4debde0
3 changed files with 303 additions and 45 deletions

View File

@@ -1,6 +1,7 @@
package org.briarproject.bramble.mailbox; package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.Cancellable; import org.briarproject.bramble.api.Cancellable;
import org.briarproject.bramble.api.connection.ConnectionRegistry;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.db.DbException;
@@ -12,6 +13,8 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.mailbox.MailboxFolderId; import org.briarproject.bramble.api.mailbox.MailboxFolderId;
import org.briarproject.bramble.api.mailbox.MailboxProperties; import org.briarproject.bramble.api.mailbox.MailboxProperties;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent;
import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent; import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
@@ -32,6 +35,7 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import static java.lang.Boolean.TRUE;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
@@ -58,9 +62,16 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
* <p> * <p>
* If there's no data to send, the worker listens for events indicating * If there's no data to send, the worker listens for events indicating
* that new data may be ready to send. * that new data may be ready to send.
* <p>
* Whenever we're directly connected to the contact, the worker doesn't
* check for data to send or start connectivity checks until the contact
* disconnects. However, if the worker has already started writing and
* uploading a file when the contact connects, the worker will finish the
* upload.
*/ */
private enum State { private enum State {
CREATED, CREATED,
CONNECTED_TO_CONTACT,
CHECKING_FOR_DATA, CHECKING_FOR_DATA,
WAITING_FOR_DATA, WAITING_FOR_DATA,
CONNECTIVITY_CHECK, CONNECTIVITY_CHECK,
@@ -95,6 +106,7 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
private final Clock clock; private final Clock clock;
private final TaskScheduler taskScheduler; private final TaskScheduler taskScheduler;
private final EventBus eventBus; private final EventBus eventBus;
private final ConnectionRegistry connectionRegistry;
private final ConnectivityChecker connectivityChecker; private final ConnectivityChecker connectivityChecker;
private final MailboxApiCaller mailboxApiCaller; private final MailboxApiCaller mailboxApiCaller;
private final MailboxApi mailboxApi; private final MailboxApi mailboxApi;
@@ -121,6 +133,7 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
Clock clock, Clock clock,
TaskScheduler taskScheduler, TaskScheduler taskScheduler,
EventBus eventBus, EventBus eventBus,
ConnectionRegistry connectionRegistry,
ConnectivityChecker connectivityChecker, ConnectivityChecker connectivityChecker,
MailboxApiCaller mailboxApiCaller, MailboxApiCaller mailboxApiCaller,
MailboxApi mailboxApi, MailboxApi mailboxApi,
@@ -133,6 +146,7 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
this.clock = clock; this.clock = clock;
this.taskScheduler = taskScheduler; this.taskScheduler = taskScheduler;
this.eventBus = eventBus; this.eventBus = eventBus;
this.connectionRegistry = connectionRegistry;
this.connectivityChecker = connectivityChecker; this.connectivityChecker = connectivityChecker;
this.mailboxApiCaller = mailboxApiCaller; this.mailboxApiCaller = mailboxApiCaller;
this.mailboxApi = mailboxApi; this.mailboxApi = mailboxApi;
@@ -182,6 +196,12 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
synchronized (lock) { synchronized (lock) {
checkTask = null; checkTask = null;
if (state != State.CHECKING_FOR_DATA) return; if (state != State.CHECKING_FOR_DATA) return;
// Check whether we're directly connected to the contact. Calling
// this while holding the lock isn't ideal, but it avoids races
if (connectionRegistry.isConnected(contactId)) {
state = State.CONNECTED_TO_CONTACT;
return;
}
} }
LOG.info("Checking for data to send"); LOG.info("Checking for data to send");
try { try {
@@ -364,8 +384,11 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
onDataToSend(); onDataToSend();
} }
} else if (e instanceof MessageSharedEvent) { } else if (e instanceof MessageSharedEvent) {
LOG.info("Message shared"); MessageSharedEvent m = (MessageSharedEvent) e;
onDataToSend(); if (m.getGroupVisibility().get(contactId) == TRUE) {
LOG.info("Message shared");
onDataToSend();
}
} else if (e instanceof GroupVisibilityUpdatedEvent) { } else if (e instanceof GroupVisibilityUpdatedEvent) {
GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e; GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e;
if (g.getVisibility() == SHARED && if (g.getVisibility() == SHARED &&
@@ -373,6 +396,18 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
LOG.info("Group shared"); LOG.info("Group shared");
onDataToSend(); onDataToSend();
} }
} else if (e instanceof ContactConnectedEvent) {
ContactConnectedEvent c = (ContactConnectedEvent) e;
if (c.getContactId().equals(contactId)) {
LOG.info("Contact connected");
onContactConnected();
}
} else if (e instanceof ContactDisconnectedEvent) {
ContactDisconnectedEvent c = (ContactDisconnectedEvent) e;
if (c.getContactId().equals(contactId)) {
LOG.info("Contact disconnected");
onContactDisconnected();
}
} }
} }
@@ -391,4 +426,36 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
// If we had scheduled a wakeup when data was due to be sent, cancel it // If we had scheduled a wakeup when data was due to be sent, cancel it
if (wakeupTask != null) wakeupTask.cancel(); if (wakeupTask != null) wakeupTask.cancel();
} }
@EventExecutor
private void onContactConnected() {
Cancellable wakeupTask = null, checkTask = null;
synchronized (lock) {
if (state == State.DESTROYED) return;
// If we're checking for data to send, waiting for data to send,
// or checking connectivity then wait until we disconnect from
// the contact before proceeding. If we're writing or uploading
// a file then continue
if (state == State.CHECKING_FOR_DATA ||
state == State.WAITING_FOR_DATA ||
state == State.CONNECTIVITY_CHECK) {
state = State.CONNECTED_TO_CONTACT;
wakeupTask = this.wakeupTask;
this.wakeupTask = null;
checkTask = this.checkTask;
this.checkTask = null;
}
}
if (wakeupTask != null) wakeupTask.cancel();
if (checkTask != null) checkTask.cancel();
}
@EventExecutor
private void onContactDisconnected() {
synchronized (lock) {
if (state != State.CONNECTED_TO_CONTACT) return;
state = State.CHECKING_FOR_DATA;
}
ioExecutor.execute(this::checkForDataToSend);
}
} }

View File

@@ -1,5 +1,6 @@
package org.briarproject.bramble.mailbox; package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.connection.ConnectionRegistry;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
@@ -25,6 +26,7 @@ class MailboxWorkerFactoryImpl implements MailboxWorkerFactory {
private final Clock clock; private final Clock clock;
private final TaskScheduler taskScheduler; private final TaskScheduler taskScheduler;
private final EventBus eventBus; private final EventBus eventBus;
private final ConnectionRegistry connectionRegistry;
private final MailboxApiCaller mailboxApiCaller; private final MailboxApiCaller mailboxApiCaller;
private final MailboxApi mailboxApi; private final MailboxApi mailboxApi;
private final MailboxFileManager mailboxFileManager; private final MailboxFileManager mailboxFileManager;
@@ -36,6 +38,7 @@ class MailboxWorkerFactoryImpl implements MailboxWorkerFactory {
Clock clock, Clock clock,
TaskScheduler taskScheduler, TaskScheduler taskScheduler,
EventBus eventBus, EventBus eventBus,
ConnectionRegistry connectionRegistry,
MailboxApiCaller mailboxApiCaller, MailboxApiCaller mailboxApiCaller,
MailboxApi mailboxApi, MailboxApi mailboxApi,
MailboxFileManager mailboxFileManager, MailboxFileManager mailboxFileManager,
@@ -45,6 +48,7 @@ class MailboxWorkerFactoryImpl implements MailboxWorkerFactory {
this.clock = clock; this.clock = clock;
this.taskScheduler = taskScheduler; this.taskScheduler = taskScheduler;
this.eventBus = eventBus; this.eventBus = eventBus;
this.connectionRegistry = connectionRegistry;
this.mailboxApiCaller = mailboxApiCaller; this.mailboxApiCaller = mailboxApiCaller;
this.mailboxApi = mailboxApi; this.mailboxApi = mailboxApi;
this.mailboxFileManager = mailboxFileManager; this.mailboxFileManager = mailboxFileManager;
@@ -57,9 +61,9 @@ class MailboxWorkerFactoryImpl implements MailboxWorkerFactory {
MailboxProperties properties, MailboxFolderId folderId, MailboxProperties properties, MailboxFolderId folderId,
ContactId contactId) { ContactId contactId) {
MailboxUploadWorker worker = new MailboxUploadWorker(ioExecutor, db, MailboxUploadWorker worker = new MailboxUploadWorker(ioExecutor, db,
clock, taskScheduler, eventBus, connectivityChecker, clock, taskScheduler, eventBus, connectionRegistry,
mailboxApiCaller, mailboxApi, mailboxFileManager, connectivityChecker, mailboxApiCaller, mailboxApi,
properties, folderId, contactId); mailboxFileManager, properties, folderId, contactId);
eventBus.addListener(worker); eventBus.addListener(worker);
return worker; return worker;
} }

View File

@@ -1,12 +1,16 @@
package org.briarproject.bramble.mailbox; package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.Cancellable; import org.briarproject.bramble.api.Cancellable;
import org.briarproject.bramble.api.connection.ConnectionRegistry;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.Transaction; import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.mailbox.MailboxFolderId; import org.briarproject.bramble.api.mailbox.MailboxFolderId;
import org.briarproject.bramble.api.mailbox.MailboxProperties; import org.briarproject.bramble.api.mailbox.MailboxProperties;
import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent;
import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent; import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
@@ -25,10 +29,12 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS; import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY; import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY;
@@ -50,6 +56,8 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
private final TaskScheduler taskScheduler = private final TaskScheduler taskScheduler =
context.mock(TaskScheduler.class); context.mock(TaskScheduler.class);
private final EventBus eventBus = context.mock(EventBus.class); private final EventBus eventBus = context.mock(EventBus.class);
private final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
private final ConnectivityChecker connectivityChecker = private final ConnectivityChecker connectivityChecker =
context.mock(ConnectivityChecker.class); context.mock(ConnectivityChecker.class);
private final MailboxApiCaller mailboxApiCaller = private final MailboxApiCaller mailboxApiCaller =
@@ -72,6 +80,9 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
private final MessageId ackedId = new MessageId(getRandomId()); private final MessageId ackedId = new MessageId(getRandomId());
private final MessageId sentId = new MessageId(getRandomId()); private final MessageId sentId = new MessageId(getRandomId());
private final MessageId newMessageId = new MessageId(getRandomId()); private final MessageId newMessageId = new MessageId(getRandomId());
private final GroupId groupId = new GroupId(getRandomId());
private final Map<ContactId, Boolean> groupVisibility =
singletonMap(contactId, true);
private File testDir, tempFile; private File testDir, tempFile;
private MailboxUploadWorker worker; private MailboxUploadWorker worker;
@@ -81,8 +92,9 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
testDir = getTestDirectory(); testDir = getTestDirectory();
tempFile = new File(testDir, "temp"); tempFile = new File(testDir, "temp");
worker = new MailboxUploadWorker(ioExecutor, db, clock, taskScheduler, worker = new MailboxUploadWorker(ioExecutor, db, clock, taskScheduler,
eventBus, connectivityChecker, mailboxApiCaller, mailboxApi, eventBus, connectionRegistry, connectivityChecker,
mailboxFileManager, mailboxProperties, folderId, contactId); mailboxApiCaller, mailboxApi, mailboxFileManager,
mailboxProperties, folderId, contactId);
} }
@After @After
@@ -93,8 +105,11 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
@Test @Test
public void testChecksForDataWhenStartedAndRemovesObserverWhenDestroyed() public void testChecksForDataWhenStartedAndRemovesObserverWhenDestroyed()
throws Exception { throws Exception {
// When the worker is started it should check for data to send // When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
worker.start(); worker.start();
@@ -106,15 +121,59 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
worker.destroy(); worker.destroy();
} }
@Test
public void testDoesNotCheckForDataWhenStartedIfConnectedToContact() {
// When the worker is started it should check the connection registry.
// We're connected to the contact, so the worker should not check for
// data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(true);
worker.start();
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test
public void testChecksForDataWhenContactDisconnects() throws Exception {
// When the worker is started it should check the connection registry.
// We're connected to the contact, so the worker should not check for
// data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(true);
worker.start();
// When the contact disconnects, the worker should start a task to
// check for data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting();
worker.eventOccurred(new ContactDisconnectedEvent(contactId));
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test @Test
public void testChecksConnectivityWhenStartedIfDataIsReady() public void testChecksConnectivityWhenStartedIfDataIsReady()
throws Exception { throws Exception {
Transaction recordTxn = new Transaction(null, false); Transaction recordTxn = new Transaction(null, false);
// When the worker is started it should check for data to send. As // When the worker is started it should check the connection registry.
// there's data ready to send immediately, the worker should start a // We're not connected to the contact, so the worker should check for
// connectivity check // data to send. As there's data ready to send immediately, the worker
// should start a connectivity check
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndStartConnectivityCheck(); expectCheckForDataToSendAndStartConnectivityCheck();
worker.start(); worker.start();
@@ -149,7 +208,9 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
worker.onConnectivityCheckSucceeded(); worker.onConnectivityCheckSucceeded();
// When the upload task runs, it should upload the file, record // When the upload task runs, it should upload the file, record
// the acked/sent messages in the DB, and check for more data to send // the acked/sent messages in the DB, and check the connection
// registry. We're not connected to the contact, so the worker should
// check for more data to send
context.checking(new DbExpectations() {{ context.checking(new DbExpectations() {{
oneOf(mailboxApi).addFile(mailboxProperties, folderId, tempFile); oneOf(mailboxApi).addFile(mailboxProperties, folderId, tempFile);
oneOf(db).transaction(with(false), withDbRunnable(recordTxn)); oneOf(db).transaction(with(false), withDbRunnable(recordTxn));
@@ -157,6 +218,7 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
oneOf(db).setMessagesSent(recordTxn, contactId, oneOf(db).setMessagesSent(recordTxn, contactId,
singletonList(sentId), MAX_LATENCY); singletonList(sentId), MAX_LATENCY);
}}); }});
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
assertFalse(upload.get().callApi()); assertFalse(upload.get().callApi());
@@ -172,11 +234,41 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
} }
@Test @Test
public void testCancelsApiCallWhenDestroyed() throws Exception { public void testDoesNotWriteFileIfContactConnectsDuringConnectivityCheck()
// When the worker is started it should check for data to send. As throws Exception {
// there's data ready to send immediately, the worker should start a // When the worker is started it should check the connection registry.
// connectivity check // We're not connected to the contact, so the worker should check for
// data to send. As there's data ready to send immediately, the worker
// should start a connectivity check
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndStartConnectivityCheck();
worker.start();
// Before the connectivity check succeeds, we make a direct connection
// to the contact
worker.eventOccurred(new ContactConnectedEvent(contactId));
// When the connectivity check succeeds, the worker should not start
// writing and uploading a file
worker.onConnectivityCheckSucceeded();
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test
public void testCancelsApiCallWhenDestroyed() throws Exception {
// When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send. As there's data ready to send immediately, the worker
// should start a connectivity check
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndStartConnectivityCheck(); expectCheckForDataToSendAndStartConnectivityCheck();
worker.start(); worker.start();
@@ -212,9 +304,7 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
// When the worker is destroyed it should remove the connectivity // When the worker is destroyed it should remove the connectivity
// observer and event listener and cancel the upload task // observer and event listener and cancel the upload task
context.checking(new Expectations() {{ expectCancelTask(apiCall);
oneOf(apiCall).cancel();
}});
expectRemoveObserverAndListener(); expectRemoveObserverAndListener();
worker.destroy(); worker.destroy();
@@ -230,16 +320,21 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
@Test @Test
public void testSchedulesWakeupWhenStartedIfDataIsNotReady() public void testSchedulesWakeupWhenStartedIfDataIsNotReady()
throws Exception { throws Exception {
// When the worker is started it should check for data to send. As // When the worker is started it should check the connection registry.
// the data isn't ready to send immediately, the worker should // We're not connected to the contact, so the worker should check for
// schedule a wakeup // data to send. As the data isn't ready to send immediately, the
// worker should schedule a wakeup
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
AtomicReference<Runnable> wakeup = new AtomicReference<>(); AtomicReference<Runnable> wakeup = new AtomicReference<>();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndScheduleWakeup(wakeup); expectCheckForDataToSendAndScheduleWakeup(wakeup);
worker.start(); worker.start();
// When the wakeup task runs it should check for data to send // When the wakeup task runs it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
wakeup.get().run(); wakeup.get().run();
@@ -252,21 +347,51 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
} }
@Test @Test
public void testCancelsWakeupIfDestroyedBeforeWakingUp() throws Exception { public void testCancelsWakeupIfContactConnectsBeforeWakingUp()
throws Exception {
// When the worker is started it should check for data to send. As // When the worker is started it should check for data to send. As
// the data isn't ready to send immediately, the worker should // the data isn't ready to send immediately, the worker should
// schedule a wakeup // schedule a wakeup
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
AtomicReference<Runnable> wakeup = new AtomicReference<>(); AtomicReference<Runnable> wakeup = new AtomicReference<>();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndScheduleWakeup(wakeup);
worker.start();
// Before the wakeup task runs, we make a direct connection to the
// contact. The worker should cancel the wakeup task
expectCancelTask(wakeupTask);
worker.eventOccurred(new ContactConnectedEvent(contactId));
// If the wakeup task runs anyway (cancellation came too late), it
// should return without doing anything
wakeup.get().run();
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test
public void testCancelsWakeupIfDestroyedBeforeWakingUp() throws Exception {
// When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send. As the data isn't ready to send immediately, the
// worker should schedule a wakeup
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
AtomicReference<Runnable> wakeup = new AtomicReference<>();
expectCheckForDataToSendAndScheduleWakeup(wakeup); expectCheckForDataToSendAndScheduleWakeup(wakeup);
worker.start(); worker.start();
// When the worker is destroyed it should cancel the wakeup and // When the worker is destroyed it should cancel the wakeup and
// remove the connectivity observer and event listener // remove the connectivity observer and event listener
context.checking(new Expectations() {{ expectCancelTask(wakeupTask);
oneOf(wakeupTask).cancel();
}});
expectRemoveObserverAndListener(); expectRemoveObserverAndListener();
worker.destroy(); worker.destroy();
@@ -279,10 +404,12 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
@Test @Test
public void testCancelsWakeupIfEventIsReceivedBeforeWakingUp() public void testCancelsWakeupIfEventIsReceivedBeforeWakingUp()
throws Exception { throws Exception {
// When the worker is started it should check for data to send. As // When the worker is started it should check the connection registry.
// the data isn't ready to send immediately, the worker should // We're not connected to the contact, so the worker should check for
// schedule a wakeup // data to send. As the data isn't ready to send immediately, the
// worker should schedule a wakeup
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
AtomicReference<Runnable> wakeup = new AtomicReference<>(); AtomicReference<Runnable> wakeup = new AtomicReference<>();
expectCheckForDataToSendAndScheduleWakeup(wakeup); expectCheckForDataToSendAndScheduleWakeup(wakeup);
@@ -293,11 +420,10 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
// wakeup task and schedule a check for new data after a short delay // wakeup task and schedule a check for new data after a short delay
AtomicReference<Runnable> check = new AtomicReference<>(); AtomicReference<Runnable> check = new AtomicReference<>();
expectScheduleCheck(check, CHECK_DELAY_MS); expectScheduleCheck(check, CHECK_DELAY_MS);
context.checking(new Expectations() {{ expectCancelTask(wakeupTask);
oneOf(wakeupTask).cancel();
}});
worker.eventOccurred(new MessageSharedEvent(newMessageId)); worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
groupVisibility));
// If the wakeup task runs anyway (cancellation came too late), it // If the wakeup task runs anyway (cancellation came too late), it
// should return early when it finds the state has changed // should return early when it finds the state has changed
@@ -306,9 +432,13 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
// Before the check task runs, the worker receives another event that // Before the check task runs, the worker receives another event that
// indicates new data may be available. The event should be ignored, // indicates new data may be available. The event should be ignored,
// as a check for new data has already been scheduled // as a check for new data has already been scheduled
worker.eventOccurred(new MessageSharedEvent(newMessageId)); worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
groupVisibility));
// When the check task runs, it should check for new data // When the check task runs, it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// new data
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
check.get().run(); check.get().run();
@@ -322,8 +452,11 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
@Test @Test
public void testCancelsCheckWhenDestroyed() throws Exception { public void testCancelsCheckWhenDestroyed() throws Exception {
// When the worker is started it should check for data to send // When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
worker.start(); worker.start();
@@ -334,13 +467,12 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
AtomicReference<Runnable> check = new AtomicReference<>(); AtomicReference<Runnable> check = new AtomicReference<>();
expectScheduleCheck(check, CHECK_DELAY_MS); expectScheduleCheck(check, CHECK_DELAY_MS);
worker.eventOccurred(new MessageSharedEvent(newMessageId)); worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
groupVisibility));
// When the worker is destroyed it should cancel the check and // When the worker is destroyed it should cancel the check and
// remove the connectivity observer and event listener // remove the connectivity observer and event listener
context.checking(new Expectations() {{ expectCancelTask(checkTask);
oneOf(checkTask).cancel();
}});
expectRemoveObserverAndListener(); expectRemoveObserverAndListener();
worker.destroy(); worker.destroy();
@@ -350,13 +482,52 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
check.get().run(); check.get().run();
} }
@Test
public void testCancelsCheckIfContactConnects() throws Exception {
// When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting();
worker.start();
// The worker receives an event that indicates new data may be
// available. The worker should schedule a check for new data after
// a short delay
AtomicReference<Runnable> check = new AtomicReference<>();
expectScheduleCheck(check, CHECK_DELAY_MS);
worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
groupVisibility));
// Before the check task runs, we make a direct connection to the
// contact. The worker should cancel the check
expectCancelTask(checkTask);
worker.eventOccurred(new ContactConnectedEvent(contactId));
// If the check runs anyway (cancellation came too late), it should
// return early when it finds the state has changed
check.get().run();
// When the worker is destroyed it should cancel the check and
// remove the connectivity observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test @Test
public void testRetriesAfterDelayIfExceptionOccursWhileWritingFile() public void testRetriesAfterDelayIfExceptionOccursWhileWritingFile()
throws Exception { throws Exception {
// When the worker is started it should check for data to send. As // When the worker is started it should check the connection registry.
// there's data ready to send immediately, the worker should start a // We're not connected to the contact, so the worker should check for
// connectivity check // data to send. As there's data ready to send immediately, the worker
// should start a connectivity check
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndStartConnectivityCheck(); expectCheckForDataToSendAndStartConnectivityCheck();
worker.start(); worker.start();
@@ -375,7 +546,10 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
worker.onConnectivityCheckSucceeded(); worker.onConnectivityCheckSucceeded();
// When the check task runs it should check for new data // When the check task runs it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// new data
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
check.get().run(); check.get().run();
@@ -387,6 +561,13 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
worker.destroy(); worker.destroy();
} }
private void expectCheckConnectionRegistry(boolean connected) {
context.checking(new Expectations() {{
oneOf(connectionRegistry).isConnected(contactId);
will(returnValue(connected));
}});
}
private void expectRunTaskOnIoExecutor() { private void expectRunTaskOnIoExecutor() {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(ioExecutor).execute(with(any(Runnable.class))); oneOf(ioExecutor).execute(with(any(Runnable.class)));
@@ -456,6 +637,12 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
}}); }});
} }
private void expectCancelTask(Cancellable task) {
context.checking(new Expectations() {{
oneOf(task).cancel();
}});
}
private void expectRemoveObserverAndListener() { private void expectRemoveObserverAndListener() {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(connectivityChecker).removeObserver(worker); oneOf(connectivityChecker).removeObserver(worker);