From 8dcf988399120f016addcc2fb4825cd88431bf94 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Mon, 20 Jun 2022 15:50:59 +0100 Subject: [PATCH] Add contact list worker for own mailbox. --- .../mailbox/OwnMailboxContactListWorker.java | 351 ++++++++++++++++++ 1 file changed, 351 insertions(+) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/mailbox/OwnMailboxContactListWorker.java diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/OwnMailboxContactListWorker.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/OwnMailboxContactListWorker.java new file mode 100644 index 000000000..d457f6026 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/OwnMailboxContactListWorker.java @@ -0,0 +1,351 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.Cancellable; +import org.briarproject.bramble.api.contact.Contact; +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.contact.event.ContactAddedEvent; +import org.briarproject.bramble.api.contact.event.ContactRemovedEvent; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.db.NoSuchContactException; +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.event.EventExecutor; +import org.briarproject.bramble.api.event.EventListener; +import org.briarproject.bramble.api.lifecycle.IoExecutor; +import org.briarproject.bramble.api.mailbox.MailboxProperties; +import org.briarproject.bramble.api.mailbox.MailboxUpdate; +import org.briarproject.bramble.api.mailbox.MailboxUpdateManager; +import org.briarproject.bramble.api.mailbox.MailboxUpdateWithMailbox; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.mailbox.ConnectivityChecker.ConnectivityObserver; +import org.briarproject.bramble.mailbox.MailboxApi.ApiException; +import org.briarproject.bramble.mailbox.MailboxApi.MailboxContact; +import org.briarproject.bramble.mailbox.MailboxApi.TolerableFailureException; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.logging.Logger; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; +import static org.briarproject.bramble.util.LogUtils.logException; + +@ThreadSafe +@NotNullByDefault +class OwnMailboxContactListWorker + implements MailboxWorker, ConnectivityObserver, EventListener { + + /** + * When the worker is started it waits for a connectivity check, then + * fetches the remote contact list and compares it to the local contact + * list. + *

+ * Any contacts that are missing from the remote list are added to the + * mailbox's contact list, while any contacts that are missing from the + * local list are removed from the mailbox's contact list. + *

+ * Once the remote contact list has been brought up to date, the worker + * waits for events indicating that contacts have been added or removed. + * Each time an event is received, the worker updates the mailbox's + * contact list and then goes back to waiting. + */ + private enum State { + CREATED, + CONNECTIVITY_CHECK, + FETCHING_CONTACT_LIST, + UPDATING_CONTACT_LIST, + WAITING_FOR_CHANGES, + DESTROYED + } + + private static final Logger LOG = + getLogger(OwnMailboxContactListWorker.class.getName()); + + private final Executor ioExecutor; + private final DatabaseComponent db; + private final EventBus eventBus; + private final ConnectivityChecker connectivityChecker; + private final MailboxApiCaller mailboxApiCaller; + private final MailboxApi mailboxApi; + private final MailboxUpdateManager mailboxUpdateManager; + private final MailboxProperties mailboxProperties; + private final Object lock = new Object(); + + @GuardedBy("lock") + private State state = State.CREATED; + + @GuardedBy("lock") + @Nullable + private Cancellable apiCall = null; + + @GuardedBy("lock") + private final Queue updates = new LinkedList<>(); + + OwnMailboxContactListWorker(@IoExecutor Executor ioExecutor, + DatabaseComponent db, + EventBus eventBus, + ConnectivityChecker connectivityChecker, + MailboxApiCaller mailboxApiCaller, + MailboxApi mailboxApi, + MailboxUpdateManager mailboxUpdateManager, + MailboxProperties mailboxProperties) { + this.ioExecutor = ioExecutor; + this.db = db; + this.connectivityChecker = connectivityChecker; + this.mailboxApiCaller = mailboxApiCaller; + this.mailboxApi = mailboxApi; + this.mailboxUpdateManager = mailboxUpdateManager; + this.mailboxProperties = mailboxProperties; + this.eventBus = eventBus; + } + + @Override + public void start() { + LOG.info("Started"); + synchronized (lock) { + if (state != State.CREATED) return; + state = State.CONNECTIVITY_CHECK; + } + // Avoid leaking observer in case destroy() is called concurrently + // before observer is added + connectivityChecker.checkConnectivity(mailboxProperties, this); + boolean destroyed; + synchronized (lock) { + destroyed = state == State.DESTROYED; + } + if (destroyed) connectivityChecker.removeObserver(this); + } + + @Override + public void destroy() { + LOG.info("Destroyed"); + Cancellable apiCall; + synchronized (lock) { + state = State.DESTROYED; + apiCall = this.apiCall; + this.apiCall = null; + } + if (apiCall != null) apiCall.cancel(); + connectivityChecker.removeObserver(this); + eventBus.removeListener(this); + } + + @Override + public void onConnectivityCheckSucceeded() { + LOG.info("Connectivity check succeeded"); + synchronized (lock) { + if (state != State.CONNECTIVITY_CHECK) return; + state = State.FETCHING_CONTACT_LIST; + apiCall = mailboxApiCaller.retryWithBackoff( + new SimpleApiCall(this::apiCallFetchContactList)); + } + } + + @IoExecutor + private void apiCallFetchContactList() throws IOException, ApiException { + synchronized (lock) { + if (state != State.FETCHING_CONTACT_LIST) return; + } + LOG.info("Fetching remote contact list"); + Collection remote = + mailboxApi.getContacts(mailboxProperties); + ioExecutor.execute(() -> loadLocalContactList(remote)); + } + + @IoExecutor + private void loadLocalContactList(Collection remote) { + synchronized (lock) { + if (state != State.FETCHING_CONTACT_LIST) return; + apiCall = null; + } + LOG.info("Loading local contact list"); + try { + db.transaction(true, txn -> { + Collection local = db.getContacts(txn); + // Handle the result on the event executor to avoid races with + // incoming events + txn.attach(() -> reconcileContactLists(local, remote)); + }); + } catch (DbException e) { + logException(LOG, WARNING, e); + } + } + + @EventExecutor + private void reconcileContactLists(Collection local, + Collection remote) { + Set localIds = new HashSet<>(); + for (Contact c : local) localIds.add(c.getId()); + remote = new HashSet<>(remote); + synchronized (lock) { + if (state != State.FETCHING_CONTACT_LIST) return; + for (ContactId c : localIds) { + if (!remote.contains(c)) updates.add(new Update(true, c)); + } + for (ContactId c : remote) { + if (!localIds.contains(c)) updates.add(new Update(false, c)); + } + if (updates.isEmpty()) { + LOG.info("Contact list is up to date"); + state = State.WAITING_FOR_CHANGES; + } else { + if (LOG.isLoggable(INFO)) { + LOG.info(updates.size() + " updates to apply"); + } + state = State.UPDATING_CONTACT_LIST; + ioExecutor.execute(this::updateContactList); + } + } + } + + @IoExecutor + private void updateContactList() { + Update update; + synchronized (lock) { + if (state != State.UPDATING_CONTACT_LIST) return; + update = updates.poll(); + if (update == null) { + LOG.info("Contact list is up to date"); + state = State.WAITING_FOR_CHANGES; + apiCall = null; + return; + } + } + if (update.add) loadMailboxProperties(update.contactId); + else removeContact(update.contactId); + } + + @IoExecutor + private void loadMailboxProperties(ContactId c) { + synchronized (lock) { + if (state != State.UPDATING_CONTACT_LIST) return; + } + LOG.info("Loading mailbox properties for contact"); + try { + MailboxUpdate mailboxUpdate = db.transactionWithResult(true, txn -> + mailboxUpdateManager.getLocalUpdate(txn, c)); + if (mailboxUpdate instanceof MailboxUpdateWithMailbox) { + addContact(c, (MailboxUpdateWithMailbox) mailboxUpdate); + } else { + // Our own mailbox was concurrently unpaired. This worker will + // be destroyed soon, so we can stop here + LOG.info("Own mailbox was unpaired"); + } + } catch (NoSuchContactException e) { + // Contact was removed concurrently. Move on to the next update. + // Later we may process a removal update for this contact, which + // was never added to the mailbox's contact list. The removal API + // call should fail safely with a TolerableFailureException + LOG.info("No such contact"); + updateContactList(); + } catch (DbException e) { + logException(LOG, WARNING, e); + } + } + + @IoExecutor + private void addContact(ContactId c, MailboxUpdateWithMailbox withMailbox) { + MailboxProperties props = withMailbox.getMailboxProperties(); + MailboxContact contact = new MailboxContact(c, props.getAuthToken(), + requireNonNull(props.getInboxId()), + requireNonNull(props.getOutboxId())); + synchronized (lock) { + if (state != State.UPDATING_CONTACT_LIST) return; + apiCall = mailboxApiCaller.retryWithBackoff(new SimpleApiCall(() -> + apiCallAddContact(contact))); + } + } + + @IoExecutor + private void apiCallAddContact(MailboxContact contact) + throws IOException, ApiException, TolerableFailureException { + synchronized (lock) { + if (state != State.UPDATING_CONTACT_LIST) return; + } + LOG.info("Adding contact to remote contact list"); + mailboxApi.addContact(mailboxProperties, contact); + updateContactList(); + } + + @IoExecutor + private void removeContact(ContactId c) { + synchronized (lock) { + if (state != State.UPDATING_CONTACT_LIST) return; + apiCall = mailboxApiCaller.retryWithBackoff(new SimpleApiCall(() -> + apiCallRemoveContact(c))); + } + } + + @IoExecutor + private void apiCallRemoveContact(ContactId c) + throws IOException, ApiException, TolerableFailureException { + synchronized (lock) { + if (state != State.UPDATING_CONTACT_LIST) return; + } + LOG.info("Removing contact from remote contact list"); + mailboxApi.deleteContact(mailboxProperties, c); + updateContactList(); + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof ContactAddedEvent) { + LOG.info("Contact added"); + onContactAdded(((ContactAddedEvent) e).getContactId()); + } else if (e instanceof ContactRemovedEvent) { + LOG.info("Contact removed"); + onContactRemoved(((ContactRemovedEvent) e).getContactId()); + } + } + + @EventExecutor + private void onContactAdded(ContactId c) { + synchronized (lock) { + if (state != State.UPDATING_CONTACT_LIST && + state != State.WAITING_FOR_CHANGES) { + return; + } + updates.add(new Update(true, c)); + if (state == State.WAITING_FOR_CHANGES) { + ioExecutor.execute(this::updateContactList); + } + } + } + + @EventExecutor + private void onContactRemoved(ContactId c) { + synchronized (lock) { + if (state != State.UPDATING_CONTACT_LIST && + state != State.WAITING_FOR_CHANGES) { + return; + } + updates.add(new Update(false, c)); + if (state == State.WAITING_FOR_CHANGES) { + ioExecutor.execute(this::updateContactList); + } + } + } + + private static class Update { + + private final boolean add; + private final ContactId contactId; + + private Update(boolean add, ContactId contactId) { + this.add = add; + this.contactId = contactId; + } + } +}