Merge branch '2352-do-not-create-files-for-upload-while-connected' into 'master'

Don't create files for upload while directly connected to contact

Closes #2352

See merge request briar/briar!1697
This commit is contained in:
Torsten Grote
2022-08-16 14:28:07 +00:00
11 changed files with 459 additions and 54 deletions

View File

@@ -283,6 +283,13 @@ public interface DatabaseComponent extends TransactionManager {
*/ */
Group getGroup(Transaction txn, GroupId g) throws DbException; Group getGroup(Transaction txn, GroupId g) throws DbException;
/**
* Returns the ID of the group containing the given message.
* <p/>
* Read-only.
*/
GroupId getGroupId(Transaction txn, MessageId m) throws DbException;
/** /**
* Returns the metadata for the given group. * Returns the metadata for the given group.
* <p/> * <p/>

View File

@@ -1,9 +1,14 @@
package org.briarproject.bramble.api.sync.event; package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.Event; import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Group.Visibility;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import java.util.Map;
import javax.annotation.concurrent.Immutable; import javax.annotation.concurrent.Immutable;
/** /**
@@ -14,12 +19,32 @@ import javax.annotation.concurrent.Immutable;
public class MessageSharedEvent extends Event { public class MessageSharedEvent extends Event {
private final MessageId messageId; private final MessageId messageId;
private final GroupId groupId;
private final Map<ContactId, Boolean> groupVisibility;
public MessageSharedEvent(MessageId message) { public MessageSharedEvent(MessageId message, GroupId groupId,
Map<ContactId, Boolean> groupVisibility) {
this.messageId = message; this.messageId = message;
this.groupId = groupId;
this.groupVisibility = groupVisibility;
} }
public MessageId getMessageId() { public MessageId getMessageId() {
return messageId; return messageId;
} }
public GroupId getGroupId() {
return groupId;
}
/**
* Returns the IDs of all contacts for which the visibility of the
* message's group is either {@link Visibility#SHARED shared} or
* {@link Visibility#VISIBLE visible}. The value in the map is true if the
* group is {@link Visibility#SHARED shared} or false if the group is
* {@link Visibility#VISIBLE visible}.
*/
public Map<ContactId, Boolean> getGroupVisibility() {
return groupVisibility;
}
} }

View File

@@ -320,6 +320,13 @@ interface Database<T> {
*/ */
Group getGroup(T txn, GroupId g) throws DbException; Group getGroup(T txn, GroupId g) throws DbException;
/**
* Returns the ID of the group containing the given message.
* <p/>
* Read-only.
*/
GroupId getGroupId(T txn, MessageId m) throws DbException;
/** /**
* Returns the metadata for the given group. * Returns the metadata for the given group.
* <p/> * <p/>
@@ -345,8 +352,11 @@ interface Database<T> {
throws DbException; throws DbException;
/** /**
* Returns the IDs of all contacts to which the given group's visibility is * Returns the IDs of all contacts for which the given group's visibility
* either {@link Visibility VISIBLE} or {@link Visibility SHARED}. * is either {@link Visibility#SHARED shared} or
* {@link Visibility#VISIBLE visible}. The value in the map is true if the
* group is {@link Visibility#SHARED shared} or false if the group is
* {@link Visibility#VISIBLE visible}.
* <p/> * <p/>
* Read-only. * Read-only.
*/ */

View File

@@ -287,7 +287,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new MessageAddedEvent(m, null)); transaction.attach(new MessageAddedEvent(m, null));
transaction.attach(new MessageStateChangedEvent(m.getId(), true, transaction.attach(new MessageStateChangedEvent(m.getId(), true,
DELIVERED)); DELIVERED));
if (shared) transaction.attach(new MessageSharedEvent(m.getId())); if (shared) {
Map<ContactId, Boolean> visibility =
db.getGroupVisibility(txn, m.getGroupId());
transaction.attach(new MessageSharedEvent(m.getId(),
m.getGroupId(), visibility));
}
} }
db.mergeMessageMetadata(txn, m.getId(), meta); db.mergeMessageMetadata(txn, m.getId(), meta);
} }
@@ -550,6 +555,15 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getGroup(txn, g); return db.getGroup(txn, g);
} }
@Override
public GroupId getGroupId(Transaction transaction, MessageId m)
throws DbException {
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
return db.getGroupId(txn, m);
}
@Override @Override
public Metadata getGroupMetadata(Transaction transaction, GroupId g) public Metadata getGroupMetadata(Transaction transaction, GroupId g)
throws DbException { throws DbException {
@@ -1182,7 +1196,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (db.getMessageState(txn, m) != DELIVERED) if (db.getMessageState(txn, m) != DELIVERED)
throw new IllegalArgumentException("Shared undelivered message"); throw new IllegalArgumentException("Shared undelivered message");
db.setMessageShared(txn, m, true); db.setMessageShared(txn, m, true);
transaction.attach(new MessageSharedEvent(m)); GroupId g = db.getGroupId(txn, m);
Map<ContactId, Boolean> visibility = db.getGroupVisibility(txn, g);
transaction.attach(new MessageSharedEvent(m, g, visibility));
} }
@Override @Override

View File

@@ -1683,6 +1683,27 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
@Override
public GroupId getGroupId(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT groupId FROM messages WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
rs = ps.executeQuery();
if (!rs.next()) throw new DbStateException();
GroupId g = new GroupId(rs.getBytes(1));
rs.close();
ps.close();
return g;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override @Override
public Collection<Group> getGroups(Connection txn, ClientId c, public Collection<Group> getGroups(Connection txn, ClientId c,
int majorVersion) throws DbException { int majorVersion) throws DbException {

View File

@@ -1,6 +1,7 @@
package org.briarproject.bramble.mailbox; package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.Cancellable; import org.briarproject.bramble.api.Cancellable;
import org.briarproject.bramble.api.connection.ConnectionRegistry;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.db.DbException;
@@ -12,6 +13,8 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.mailbox.MailboxFolderId; import org.briarproject.bramble.api.mailbox.MailboxFolderId;
import org.briarproject.bramble.api.mailbox.MailboxProperties; import org.briarproject.bramble.api.mailbox.MailboxProperties;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent;
import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent; import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
@@ -32,6 +35,7 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import static java.lang.Boolean.TRUE;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
@@ -58,9 +62,16 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
* <p> * <p>
* If there's no data to send, the worker listens for events indicating * If there's no data to send, the worker listens for events indicating
* that new data may be ready to send. * that new data may be ready to send.
* <p>
* Whenever we're directly connected to the contact, the worker doesn't
* check for data to send or start connectivity checks until the contact
* disconnects. However, if the worker has already started writing and
* uploading a file when the contact connects, the worker will finish the
* upload.
*/ */
private enum State { private enum State {
CREATED, CREATED,
CONNECTED_TO_CONTACT,
CHECKING_FOR_DATA, CHECKING_FOR_DATA,
WAITING_FOR_DATA, WAITING_FOR_DATA,
CONNECTIVITY_CHECK, CONNECTIVITY_CHECK,
@@ -95,6 +106,7 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
private final Clock clock; private final Clock clock;
private final TaskScheduler taskScheduler; private final TaskScheduler taskScheduler;
private final EventBus eventBus; private final EventBus eventBus;
private final ConnectionRegistry connectionRegistry;
private final ConnectivityChecker connectivityChecker; private final ConnectivityChecker connectivityChecker;
private final MailboxApiCaller mailboxApiCaller; private final MailboxApiCaller mailboxApiCaller;
private final MailboxApi mailboxApi; private final MailboxApi mailboxApi;
@@ -121,6 +133,7 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
Clock clock, Clock clock,
TaskScheduler taskScheduler, TaskScheduler taskScheduler,
EventBus eventBus, EventBus eventBus,
ConnectionRegistry connectionRegistry,
ConnectivityChecker connectivityChecker, ConnectivityChecker connectivityChecker,
MailboxApiCaller mailboxApiCaller, MailboxApiCaller mailboxApiCaller,
MailboxApi mailboxApi, MailboxApi mailboxApi,
@@ -133,6 +146,7 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
this.clock = clock; this.clock = clock;
this.taskScheduler = taskScheduler; this.taskScheduler = taskScheduler;
this.eventBus = eventBus; this.eventBus = eventBus;
this.connectionRegistry = connectionRegistry;
this.connectivityChecker = connectivityChecker; this.connectivityChecker = connectivityChecker;
this.mailboxApiCaller = mailboxApiCaller; this.mailboxApiCaller = mailboxApiCaller;
this.mailboxApi = mailboxApi; this.mailboxApi = mailboxApi;
@@ -182,6 +196,12 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
synchronized (lock) { synchronized (lock) {
checkTask = null; checkTask = null;
if (state != State.CHECKING_FOR_DATA) return; if (state != State.CHECKING_FOR_DATA) return;
// Check whether we're directly connected to the contact. Calling
// this while holding the lock isn't ideal, but it avoids races
if (connectionRegistry.isConnected(contactId)) {
state = State.CONNECTED_TO_CONTACT;
return;
}
} }
LOG.info("Checking for data to send"); LOG.info("Checking for data to send");
try { try {
@@ -364,8 +384,14 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
onDataToSend(); onDataToSend();
} }
} else if (e instanceof MessageSharedEvent) { } else if (e instanceof MessageSharedEvent) {
LOG.info("Message shared"); MessageSharedEvent m = (MessageSharedEvent) e;
onDataToSend(); // If the contact is present in the map (ie the value is not null)
// and the value is true, the message's group is shared with the
// contact and therefore the message may now be sendable
if (m.getGroupVisibility().get(contactId) == TRUE) {
LOG.info("Message shared");
onDataToSend();
}
} else if (e instanceof GroupVisibilityUpdatedEvent) { } else if (e instanceof GroupVisibilityUpdatedEvent) {
GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e; GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e;
if (g.getVisibility() == SHARED && if (g.getVisibility() == SHARED &&
@@ -373,6 +399,18 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
LOG.info("Group shared"); LOG.info("Group shared");
onDataToSend(); onDataToSend();
} }
} else if (e instanceof ContactConnectedEvent) {
ContactConnectedEvent c = (ContactConnectedEvent) e;
if (c.getContactId().equals(contactId)) {
LOG.info("Contact connected");
onContactConnected();
}
} else if (e instanceof ContactDisconnectedEvent) {
ContactDisconnectedEvent c = (ContactDisconnectedEvent) e;
if (c.getContactId().equals(contactId)) {
LOG.info("Contact disconnected");
onContactDisconnected();
}
} }
} }
@@ -391,4 +429,36 @@ class MailboxUploadWorker implements MailboxWorker, ConnectivityObserver,
// If we had scheduled a wakeup when data was due to be sent, cancel it // If we had scheduled a wakeup when data was due to be sent, cancel it
if (wakeupTask != null) wakeupTask.cancel(); if (wakeupTask != null) wakeupTask.cancel();
} }
@EventExecutor
private void onContactConnected() {
Cancellable wakeupTask = null, checkTask = null;
synchronized (lock) {
if (state == State.DESTROYED) return;
// If we're checking for data to send, waiting for data to send,
// or checking connectivity then wait until we disconnect from
// the contact before proceeding. If we're writing or uploading
// a file then continue
if (state == State.CHECKING_FOR_DATA ||
state == State.WAITING_FOR_DATA ||
state == State.CONNECTIVITY_CHECK) {
state = State.CONNECTED_TO_CONTACT;
wakeupTask = this.wakeupTask;
this.wakeupTask = null;
checkTask = this.checkTask;
this.checkTask = null;
}
}
if (wakeupTask != null) wakeupTask.cancel();
if (checkTask != null) checkTask.cancel();
}
@EventExecutor
private void onContactDisconnected() {
synchronized (lock) {
if (state != State.CONNECTED_TO_CONTACT) return;
state = State.CHECKING_FOR_DATA;
}
ioExecutor.execute(this::checkForDataToSend);
}
} }

View File

@@ -1,5 +1,6 @@
package org.briarproject.bramble.mailbox; package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.connection.ConnectionRegistry;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
@@ -25,6 +26,7 @@ class MailboxWorkerFactoryImpl implements MailboxWorkerFactory {
private final Clock clock; private final Clock clock;
private final TaskScheduler taskScheduler; private final TaskScheduler taskScheduler;
private final EventBus eventBus; private final EventBus eventBus;
private final ConnectionRegistry connectionRegistry;
private final MailboxApiCaller mailboxApiCaller; private final MailboxApiCaller mailboxApiCaller;
private final MailboxApi mailboxApi; private final MailboxApi mailboxApi;
private final MailboxFileManager mailboxFileManager; private final MailboxFileManager mailboxFileManager;
@@ -36,6 +38,7 @@ class MailboxWorkerFactoryImpl implements MailboxWorkerFactory {
Clock clock, Clock clock,
TaskScheduler taskScheduler, TaskScheduler taskScheduler,
EventBus eventBus, EventBus eventBus,
ConnectionRegistry connectionRegistry,
MailboxApiCaller mailboxApiCaller, MailboxApiCaller mailboxApiCaller,
MailboxApi mailboxApi, MailboxApi mailboxApi,
MailboxFileManager mailboxFileManager, MailboxFileManager mailboxFileManager,
@@ -45,6 +48,7 @@ class MailboxWorkerFactoryImpl implements MailboxWorkerFactory {
this.clock = clock; this.clock = clock;
this.taskScheduler = taskScheduler; this.taskScheduler = taskScheduler;
this.eventBus = eventBus; this.eventBus = eventBus;
this.connectionRegistry = connectionRegistry;
this.mailboxApiCaller = mailboxApiCaller; this.mailboxApiCaller = mailboxApiCaller;
this.mailboxApi = mailboxApi; this.mailboxApi = mailboxApi;
this.mailboxFileManager = mailboxFileManager; this.mailboxFileManager = mailboxFileManager;
@@ -57,9 +61,9 @@ class MailboxWorkerFactoryImpl implements MailboxWorkerFactory {
MailboxProperties properties, MailboxFolderId folderId, MailboxProperties properties, MailboxFolderId folderId,
ContactId contactId) { ContactId contactId) {
MailboxUploadWorker worker = new MailboxUploadWorker(ioExecutor, db, MailboxUploadWorker worker = new MailboxUploadWorker(ioExecutor, db,
clock, taskScheduler, eventBus, connectivityChecker, clock, taskScheduler, eventBus, connectionRegistry,
mailboxApiCaller, mailboxApi, mailboxFileManager, connectivityChecker, mailboxApiCaller, mailboxApi,
properties, folderId, contactId); mailboxFileManager, properties, folderId, contactId);
eventBus.addListener(worker); eventBus.addListener(worker);
return worker; return worker;
} }

View File

@@ -44,6 +44,7 @@ import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import static java.lang.Boolean.TRUE;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
@@ -233,7 +234,13 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
ContactRemovedEvent c = (ContactRemovedEvent) e; ContactRemovedEvent c = (ContactRemovedEvent) e;
if (c.getContactId().equals(contactId)) interrupt(); if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof MessageSharedEvent) { } else if (e instanceof MessageSharedEvent) {
generateOffer(); MessageSharedEvent m = (MessageSharedEvent) e;
// If the contact is present in the map (ie the value is not null)
// and the value is true, the message's group is shared with the
// contact and therefore the message may now be sendable
if (m.getGroupVisibility().get(contactId) == TRUE) {
generateOffer();
}
} else if (e instanceof GroupVisibilityUpdatedEvent) { } else if (e instanceof GroupVisibilityUpdatedEvent) {
GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e; GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e;
if (g.getVisibility() == SHARED && if (g.getVisibility() == SHARED &&

View File

@@ -694,11 +694,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
throws Exception { throws Exception {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// Check whether the message is in the DB (which it's not) // Check whether the message is in the DB (which it's not)
exactly(15).of(database).startTransaction(); exactly(16).of(database).startTransaction();
will(returnValue(txn)); will(returnValue(txn));
exactly(15).of(database).containsMessage(txn, messageId); exactly(16).of(database).containsMessage(txn, messageId);
will(returnValue(false)); will(returnValue(false));
exactly(15).of(database).abortTransaction(txn); exactly(16).of(database).abortTransaction(txn);
// Allow other checks to pass // Allow other checks to pass
allowing(database).containsContact(txn, contactId); allowing(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
@@ -722,6 +722,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
// Expected // Expected
} }
try {
db.transaction(true, transaction ->
db.getGroupId(transaction, messageId));
fail();
} catch (NoSuchMessageException expected) {
// Expected
}
try { try {
db.transaction(true, transaction -> db.transaction(true, transaction ->
db.getMessage(transaction, messageId)); db.getMessage(transaction, messageId));

View File

@@ -168,6 +168,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(db.containsContact(txn, contactId)); assertTrue(db.containsContact(txn, contactId));
assertTrue(db.containsGroup(txn, groupId)); assertTrue(db.containsGroup(txn, groupId));
assertTrue(db.containsMessage(txn, messageId)); assertTrue(db.containsMessage(txn, messageId));
assertEquals(groupId, db.getGroupId(txn, messageId));
assertArrayEquals(message.getBody(), assertArrayEquals(message.getBody(),
db.getMessage(txn, messageId).getBody()); db.getMessage(txn, messageId).getBody());

View File

@@ -1,12 +1,16 @@
package org.briarproject.bramble.mailbox; package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.Cancellable; import org.briarproject.bramble.api.Cancellable;
import org.briarproject.bramble.api.connection.ConnectionRegistry;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.Transaction; import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.mailbox.MailboxFolderId; import org.briarproject.bramble.api.mailbox.MailboxFolderId;
import org.briarproject.bramble.api.mailbox.MailboxProperties; import org.briarproject.bramble.api.mailbox.MailboxProperties;
import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent;
import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent; import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
@@ -25,10 +29,13 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS; import static org.briarproject.bramble.api.mailbox.MailboxConstants.CLIENT_SUPPORTS;
import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY; import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY;
@@ -50,6 +57,8 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
private final TaskScheduler taskScheduler = private final TaskScheduler taskScheduler =
context.mock(TaskScheduler.class); context.mock(TaskScheduler.class);
private final EventBus eventBus = context.mock(EventBus.class); private final EventBus eventBus = context.mock(EventBus.class);
private final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
private final ConnectivityChecker connectivityChecker = private final ConnectivityChecker connectivityChecker =
context.mock(ConnectivityChecker.class); context.mock(ConnectivityChecker.class);
private final MailboxApiCaller mailboxApiCaller = private final MailboxApiCaller mailboxApiCaller =
@@ -72,6 +81,9 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
private final MessageId ackedId = new MessageId(getRandomId()); private final MessageId ackedId = new MessageId(getRandomId());
private final MessageId sentId = new MessageId(getRandomId()); private final MessageId sentId = new MessageId(getRandomId());
private final MessageId newMessageId = new MessageId(getRandomId()); private final MessageId newMessageId = new MessageId(getRandomId());
private final GroupId groupId = new GroupId(getRandomId());
private final Map<ContactId, Boolean> groupVisibility =
singletonMap(contactId, true);
private File testDir, tempFile; private File testDir, tempFile;
private MailboxUploadWorker worker; private MailboxUploadWorker worker;
@@ -81,8 +93,9 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
testDir = getTestDirectory(); testDir = getTestDirectory();
tempFile = new File(testDir, "temp"); tempFile = new File(testDir, "temp");
worker = new MailboxUploadWorker(ioExecutor, db, clock, taskScheduler, worker = new MailboxUploadWorker(ioExecutor, db, clock, taskScheduler,
eventBus, connectivityChecker, mailboxApiCaller, mailboxApi, eventBus, connectionRegistry, connectivityChecker,
mailboxFileManager, mailboxProperties, folderId, contactId); mailboxApiCaller, mailboxApi, mailboxFileManager,
mailboxProperties, folderId, contactId);
} }
@After @After
@@ -93,8 +106,11 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
@Test @Test
public void testChecksForDataWhenStartedAndRemovesObserverWhenDestroyed() public void testChecksForDataWhenStartedAndRemovesObserverWhenDestroyed()
throws Exception { throws Exception {
// When the worker is started it should check for data to send // When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
worker.start(); worker.start();
@@ -106,15 +122,59 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
worker.destroy(); worker.destroy();
} }
@Test
public void testDoesNotCheckForDataWhenStartedIfConnectedToContact() {
// When the worker is started it should check the connection registry.
// We're connected to the contact, so the worker should not check for
// data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(true);
worker.start();
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test
public void testChecksForDataWhenContactDisconnects() throws Exception {
// When the worker is started it should check the connection registry.
// We're connected to the contact, so the worker should not check for
// data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(true);
worker.start();
// When the contact disconnects, the worker should start a task to
// check for data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting();
worker.eventOccurred(new ContactDisconnectedEvent(contactId));
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test @Test
public void testChecksConnectivityWhenStartedIfDataIsReady() public void testChecksConnectivityWhenStartedIfDataIsReady()
throws Exception { throws Exception {
Transaction recordTxn = new Transaction(null, false); Transaction recordTxn = new Transaction(null, false);
// When the worker is started it should check for data to send. As // When the worker is started it should check the connection registry.
// there's data ready to send immediately, the worker should start a // We're not connected to the contact, so the worker should check for
// connectivity check // data to send. As there's data ready to send immediately, the worker
// should start a connectivity check
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndStartConnectivityCheck(); expectCheckForDataToSendAndStartConnectivityCheck();
worker.start(); worker.start();
@@ -149,7 +209,9 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
worker.onConnectivityCheckSucceeded(); worker.onConnectivityCheckSucceeded();
// When the upload task runs, it should upload the file, record // When the upload task runs, it should upload the file, record
// the acked/sent messages in the DB, and check for more data to send // the acked/sent messages in the DB, and check the connection
// registry. We're not connected to the contact, so the worker should
// check for more data to send
context.checking(new DbExpectations() {{ context.checking(new DbExpectations() {{
oneOf(mailboxApi).addFile(mailboxProperties, folderId, tempFile); oneOf(mailboxApi).addFile(mailboxProperties, folderId, tempFile);
oneOf(db).transaction(with(false), withDbRunnable(recordTxn)); oneOf(db).transaction(with(false), withDbRunnable(recordTxn));
@@ -157,6 +219,7 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
oneOf(db).setMessagesSent(recordTxn, contactId, oneOf(db).setMessagesSent(recordTxn, contactId,
singletonList(sentId), MAX_LATENCY); singletonList(sentId), MAX_LATENCY);
}}); }});
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
assertFalse(upload.get().callApi()); assertFalse(upload.get().callApi());
@@ -172,11 +235,41 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
} }
@Test @Test
public void testCancelsApiCallWhenDestroyed() throws Exception { public void testDoesNotWriteFileIfContactConnectsDuringConnectivityCheck()
// When the worker is started it should check for data to send. As throws Exception {
// there's data ready to send immediately, the worker should start a // When the worker is started it should check the connection registry.
// connectivity check // We're not connected to the contact, so the worker should check for
// data to send. As there's data ready to send immediately, the worker
// should start a connectivity check
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndStartConnectivityCheck();
worker.start();
// Before the connectivity check succeeds, we make a direct connection
// to the contact
worker.eventOccurred(new ContactConnectedEvent(contactId));
// When the connectivity check succeeds, the worker should not start
// writing and uploading a file
worker.onConnectivityCheckSucceeded();
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test
public void testCancelsApiCallWhenDestroyed() throws Exception {
// When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send. As there's data ready to send immediately, the worker
// should start a connectivity check
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndStartConnectivityCheck(); expectCheckForDataToSendAndStartConnectivityCheck();
worker.start(); worker.start();
@@ -212,9 +305,7 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
// When the worker is destroyed it should remove the connectivity // When the worker is destroyed it should remove the connectivity
// observer and event listener and cancel the upload task // observer and event listener and cancel the upload task
context.checking(new Expectations() {{ expectCancelTask(apiCall);
oneOf(apiCall).cancel();
}});
expectRemoveObserverAndListener(); expectRemoveObserverAndListener();
worker.destroy(); worker.destroy();
@@ -230,16 +321,21 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
@Test @Test
public void testSchedulesWakeupWhenStartedIfDataIsNotReady() public void testSchedulesWakeupWhenStartedIfDataIsNotReady()
throws Exception { throws Exception {
// When the worker is started it should check for data to send. As // When the worker is started it should check the connection registry.
// the data isn't ready to send immediately, the worker should // We're not connected to the contact, so the worker should check for
// schedule a wakeup // data to send. As the data isn't ready to send immediately, the
// worker should schedule a wakeup
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
AtomicReference<Runnable> wakeup = new AtomicReference<>(); AtomicReference<Runnable> wakeup = new AtomicReference<>();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndScheduleWakeup(wakeup); expectCheckForDataToSendAndScheduleWakeup(wakeup);
worker.start(); worker.start();
// When the wakeup task runs it should check for data to send // When the wakeup task runs it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
wakeup.get().run(); wakeup.get().run();
@@ -252,21 +348,51 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
} }
@Test @Test
public void testCancelsWakeupIfDestroyedBeforeWakingUp() throws Exception { public void testCancelsWakeupIfContactConnectsBeforeWakingUp()
throws Exception {
// When the worker is started it should check for data to send. As // When the worker is started it should check for data to send. As
// the data isn't ready to send immediately, the worker should // the data isn't ready to send immediately, the worker should
// schedule a wakeup // schedule a wakeup
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
AtomicReference<Runnable> wakeup = new AtomicReference<>(); AtomicReference<Runnable> wakeup = new AtomicReference<>();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndScheduleWakeup(wakeup);
worker.start();
// Before the wakeup task runs, we make a direct connection to the
// contact. The worker should cancel the wakeup task
expectCancelTask(wakeupTask);
worker.eventOccurred(new ContactConnectedEvent(contactId));
// If the wakeup task runs anyway (cancellation came too late), it
// should return without doing anything
wakeup.get().run();
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test
public void testCancelsWakeupIfDestroyedBeforeWakingUp() throws Exception {
// When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send. As the data isn't ready to send immediately, the
// worker should schedule a wakeup
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
AtomicReference<Runnable> wakeup = new AtomicReference<>();
expectCheckForDataToSendAndScheduleWakeup(wakeup); expectCheckForDataToSendAndScheduleWakeup(wakeup);
worker.start(); worker.start();
// When the worker is destroyed it should cancel the wakeup and // When the worker is destroyed it should cancel the wakeup and
// remove the connectivity observer and event listener // remove the connectivity observer and event listener
context.checking(new Expectations() {{ expectCancelTask(wakeupTask);
oneOf(wakeupTask).cancel();
}});
expectRemoveObserverAndListener(); expectRemoveObserverAndListener();
worker.destroy(); worker.destroy();
@@ -279,10 +405,12 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
@Test @Test
public void testCancelsWakeupIfEventIsReceivedBeforeWakingUp() public void testCancelsWakeupIfEventIsReceivedBeforeWakingUp()
throws Exception { throws Exception {
// When the worker is started it should check for data to send. As // When the worker is started it should check the connection registry.
// the data isn't ready to send immediately, the worker should // We're not connected to the contact, so the worker should check for
// schedule a wakeup // data to send. As the data isn't ready to send immediately, the
// worker should schedule a wakeup
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
AtomicReference<Runnable> wakeup = new AtomicReference<>(); AtomicReference<Runnable> wakeup = new AtomicReference<>();
expectCheckForDataToSendAndScheduleWakeup(wakeup); expectCheckForDataToSendAndScheduleWakeup(wakeup);
@@ -293,11 +421,10 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
// wakeup task and schedule a check for new data after a short delay // wakeup task and schedule a check for new data after a short delay
AtomicReference<Runnable> check = new AtomicReference<>(); AtomicReference<Runnable> check = new AtomicReference<>();
expectScheduleCheck(check, CHECK_DELAY_MS); expectScheduleCheck(check, CHECK_DELAY_MS);
context.checking(new Expectations() {{ expectCancelTask(wakeupTask);
oneOf(wakeupTask).cancel();
}});
worker.eventOccurred(new MessageSharedEvent(newMessageId)); worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
groupVisibility));
// If the wakeup task runs anyway (cancellation came too late), it // If the wakeup task runs anyway (cancellation came too late), it
// should return early when it finds the state has changed // should return early when it finds the state has changed
@@ -306,9 +433,13 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
// Before the check task runs, the worker receives another event that // Before the check task runs, the worker receives another event that
// indicates new data may be available. The event should be ignored, // indicates new data may be available. The event should be ignored,
// as a check for new data has already been scheduled // as a check for new data has already been scheduled
worker.eventOccurred(new MessageSharedEvent(newMessageId)); worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
groupVisibility));
// When the check task runs, it should check for new data // When the check task runs, it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// new data
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
check.get().run(); check.get().run();
@@ -322,8 +453,11 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
@Test @Test
public void testCancelsCheckWhenDestroyed() throws Exception { public void testCancelsCheckWhenDestroyed() throws Exception {
// When the worker is started it should check for data to send // When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
worker.start(); worker.start();
@@ -334,13 +468,12 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
AtomicReference<Runnable> check = new AtomicReference<>(); AtomicReference<Runnable> check = new AtomicReference<>();
expectScheduleCheck(check, CHECK_DELAY_MS); expectScheduleCheck(check, CHECK_DELAY_MS);
worker.eventOccurred(new MessageSharedEvent(newMessageId)); worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
groupVisibility));
// When the worker is destroyed it should cancel the check and // When the worker is destroyed it should cancel the check and
// remove the connectivity observer and event listener // remove the connectivity observer and event listener
context.checking(new Expectations() {{ expectCancelTask(checkTask);
oneOf(checkTask).cancel();
}});
expectRemoveObserverAndListener(); expectRemoveObserverAndListener();
worker.destroy(); worker.destroy();
@@ -350,13 +483,100 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
check.get().run(); check.get().run();
} }
@Test
public void testCancelsCheckIfContactConnects() throws Exception {
// When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting();
worker.start();
// The worker receives an event that indicates new data may be
// available. The worker should schedule a check for new data after
// a short delay
AtomicReference<Runnable> check = new AtomicReference<>();
expectScheduleCheck(check, CHECK_DELAY_MS);
worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
groupVisibility));
// Before the check task runs, we make a direct connection to the
// contact. The worker should cancel the check
expectCancelTask(checkTask);
worker.eventOccurred(new ContactConnectedEvent(contactId));
// If the check runs anyway (cancellation came too late), it should
// return early when it finds the state has changed
check.get().run();
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test
public void testDoesNotScheduleCheckIfGroupIsVisible() throws Exception {
// When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting();
worker.start();
// The worker receives an event that indicates new data may be
// available. The group is visible to the contact but not shared, so
// the worker should not schedule a check for new data
worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
singletonMap(contactId, false)));
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test
public void testDoesNotScheduleCheckIfGroupIsInvisible() throws Exception {
// When the worker is started it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// data to send
expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting();
worker.start();
// The worker receives an event that indicates new data may be
// available. The group is not visible to the contact, so the worker
// should not schedule a check for new data
worker.eventOccurred(new MessageSharedEvent(newMessageId, groupId,
emptyMap()));
// When the worker is destroyed it should remove the connectivity
// observer and event listener
expectRemoveObserverAndListener();
worker.destroy();
}
@Test @Test
public void testRetriesAfterDelayIfExceptionOccursWhileWritingFile() public void testRetriesAfterDelayIfExceptionOccursWhileWritingFile()
throws Exception { throws Exception {
// When the worker is started it should check for data to send. As // When the worker is started it should check the connection registry.
// there's data ready to send immediately, the worker should start a // We're not connected to the contact, so the worker should check for
// connectivity check // data to send. As there's data ready to send immediately, the worker
// should start a connectivity check
expectRunTaskOnIoExecutor(); expectRunTaskOnIoExecutor();
expectCheckConnectionRegistry(false);
expectCheckForDataToSendAndStartConnectivityCheck(); expectCheckForDataToSendAndStartConnectivityCheck();
worker.start(); worker.start();
@@ -375,7 +595,10 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
worker.onConnectivityCheckSucceeded(); worker.onConnectivityCheckSucceeded();
// When the check task runs it should check for new data // When the check task runs it should check the connection registry.
// We're not connected to the contact, so the worker should check for
// new data
expectCheckConnectionRegistry(false);
expectCheckForDataToSendNoDataWaiting(); expectCheckForDataToSendNoDataWaiting();
check.get().run(); check.get().run();
@@ -387,6 +610,13 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
worker.destroy(); worker.destroy();
} }
private void expectCheckConnectionRegistry(boolean connected) {
context.checking(new Expectations() {{
oneOf(connectionRegistry).isConnected(contactId);
will(returnValue(connected));
}});
}
private void expectRunTaskOnIoExecutor() { private void expectRunTaskOnIoExecutor() {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(ioExecutor).execute(with(any(Runnable.class))); oneOf(ioExecutor).execute(with(any(Runnable.class)));
@@ -456,6 +686,12 @@ public class MailboxUploadWorkerTest extends BrambleMockTestCase {
}}); }});
} }
private void expectCancelTask(Cancellable task) {
context.checking(new Expectations() {{
oneOf(task).cancel();
}});
}
private void expectRemoveObserverAndListener() { private void expectRemoveObserverAndListener() {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(connectivityChecker).removeObserver(worker); oneOf(connectivityChecker).removeObserver(worker);