From 3d8826cef9b94f83d31994e22d48bfc4d02d37c8 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 25 Feb 2021 15:40:52 +0000 Subject: [PATCH] Add cleanup manager. --- .../bramble/api/cleanup/CleanupHook.java | 14 ++ .../bramble/api/cleanup/CleanupManager.java | 25 +++ .../event/CleanupTimerStartedEvent.java | 32 ++++ .../cleanup/event/MessagesCleanedUpEvent.java | 36 ++++ .../bramble/api/client/ClientHelper.java | 4 +- .../bramble/api/db/DatabaseComponent.java | 59 ++++++ .../bramble/BrambleCoreEagerSingletons.java | 4 + .../bramble/BrambleCoreModule.java | 2 + .../bramble/cleanup/CleanupManagerImpl.java | 181 ++++++++++++++++++ .../bramble/cleanup/CleanupModule.java | 29 +++ .../org/briarproject/bramble/db/Database.java | 50 ++++- .../bramble/db/DatabaseComponentImpl.java | 72 ++++++- .../briarproject/bramble/db/JdbcDatabase.java | 164 +++++++++++++++- .../bramble/db/Migration47_48.java | 47 +++++ .../bramble/db/DatabaseComponentImplTest.java | 99 +++++++++- .../bramble/db/JdbcDatabaseTest.java | 94 ++++++++- 16 files changed, 878 insertions(+), 34 deletions(-) create mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/CleanupHook.java create mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/CleanupManager.java create mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/event/CleanupTimerStartedEvent.java create mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/event/MessagesCleanedUpEvent.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/cleanup/CleanupManagerImpl.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/cleanup/CleanupModule.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/db/Migration47_48.java diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/CleanupHook.java b/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/CleanupHook.java new file mode 100644 index 000000000..85a396e11 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/CleanupHook.java @@ -0,0 +1,14 @@ +package org.briarproject.bramble.api.cleanup; + +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.db.Transaction; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.GroupId; +import org.briarproject.bramble.api.sync.MessageId; + +@NotNullByDefault +public interface CleanupHook { + + boolean deleteMessage(Transaction txn, GroupId g, MessageId m) + throws DbException; +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/CleanupManager.java b/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/CleanupManager.java new file mode 100644 index 000000000..87764e62f --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/CleanupManager.java @@ -0,0 +1,25 @@ +package org.briarproject.bramble.api.cleanup; + +import org.briarproject.bramble.api.crypto.SecretKey; +import org.briarproject.bramble.api.lifecycle.LifecycleManager; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.ClientId; + +@NotNullByDefault +public interface CleanupManager { + + /** + * When scheduling a cleanup task we overshoot the deadline by this many + * milliseconds to reduce the number of tasks that need to be scheduled + * when messages have cleanup deadlines that are close together. + */ + long BATCH_DELAY_MS = 1000; + + /** + * Registers a hook to be called when messages are due for cleanup. + * This method should be called before + * {@link LifecycleManager#startServices(SecretKey)}. + */ + void registerCleanupHook(ClientId c, int majorVersion, + CleanupHook hook); +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/event/CleanupTimerStartedEvent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/event/CleanupTimerStartedEvent.java new file mode 100644 index 000000000..f941cd806 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/event/CleanupTimerStartedEvent.java @@ -0,0 +1,32 @@ +package org.briarproject.bramble.api.cleanup.event; + +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.MessageId; + +import javax.annotation.concurrent.Immutable; + +/** + * An event that is broadcast when a message's cleanup timer is started. + */ +@Immutable +@NotNullByDefault +public class CleanupTimerStartedEvent extends Event { + + private final MessageId messageId; + private final long cleanupDeadline; + + public CleanupTimerStartedEvent(MessageId messageId, + long cleanupDeadline) { + this.messageId = messageId; + this.cleanupDeadline = cleanupDeadline; + } + + public MessageId getMessageId() { + return messageId; + } + + public long getCleanupDeadline() { + return cleanupDeadline; + } +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/event/MessagesCleanedUpEvent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/event/MessagesCleanedUpEvent.java new file mode 100644 index 000000000..9e36d56e0 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/cleanup/event/MessagesCleanedUpEvent.java @@ -0,0 +1,36 @@ +package org.briarproject.bramble.api.cleanup.event; + +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.GroupId; +import org.briarproject.bramble.api.sync.MessageId; + +import java.util.Collection; + +import javax.annotation.concurrent.Immutable; + +/** + * An event that is broadcast when one or more messages in a group are + * cleaned up. + */ +@Immutable +@NotNullByDefault +public class MessagesCleanedUpEvent extends Event { + + private final GroupId groupId; + private final Collection messageIds; + + public MessagesCleanedUpEvent(GroupId groupId, + Collection messageIds) { + this.groupId = groupId; + this.messageIds = messageIds; + } + + public GroupId getGroupId() { + return groupId; + } + + public Collection getMessageIds() { + return messageIds; + } +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/client/ClientHelper.java b/bramble-api/src/main/java/org/briarproject/bramble/api/client/ClientHelper.java index 396168d2a..c04454d30 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/client/ClientHelper.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/client/ClientHelper.java @@ -128,12 +128,12 @@ public interface ClientHelper { * group. */ ContactId getContactId(Transaction txn, GroupId contactGroupId) - throws DbException, FormatException; + throws DbException, FormatException; /** * Stores the given contact ID in the group metadata of the given contact * group. */ void setContactId(Transaction txn, GroupId contactGroupId, ContactId c) - throws DbException; + throws DbException; } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index 3a8046bb6..cf6bfb61a 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -41,6 +41,18 @@ import javax.annotation.Nullable; @NotNullByDefault public interface DatabaseComponent extends TransactionManager { + /** + * Return value for {@link #getNextCleanupDeadline(Transaction)} if + * no messages are scheduled to be deleted. + */ + long NO_CLEANUP_DEADLINE = -1; + + /** + * Return value for {@link #startCleanupTimer(Transaction, MessageId)} + * if the cleanup timer was not started. + */ + long TIMER_NOT_STARTED = -1; + /** * Opens the database and returns true if the database already existed. * @@ -324,6 +336,15 @@ public interface DatabaseComponent extends TransactionManager { Collection getMessagesToShare(Transaction txn) throws DbException; + /** + * Returns the IDs of any messages of any messages that are due for + * deletion, along with their group IDs. + *

+ * Read-only. + */ + Map getMessagesToDelete(Transaction txn) + throws DbException; + /** * Returns the metadata for all delivered messages in the given group. *

@@ -405,6 +426,15 @@ public interface DatabaseComponent extends TransactionManager { MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m) throws DbException; + /** + * Returns the next time (in milliseconds since the Unix epoch) when a + * message is due to be deleted, or {@link #NO_CLEANUP_DEADLINE} + * if no messages are scheduled to be deleted. + *

+ * Read-only. + */ + long getNextCleanupDeadline(Transaction txn) throws DbException; + /* * Returns the next time (in milliseconds since the Unix epoch) when a * message is due to be sent to the given contact. The returned value may @@ -545,6 +575,13 @@ public interface DatabaseComponent extends TransactionManager { void removeTransportKeys(Transaction txn, TransportId t, KeySetId k) throws DbException; + /** + * Sets the cleanup timer duration for the given message. This does not + * start the message's cleanup timer. + */ + void setCleanupTimerDuration(Transaction txn, MessageId m, long duration) + throws DbException; + /** * Marks the given contact as verified. */ @@ -567,6 +604,12 @@ public interface DatabaseComponent extends TransactionManager { */ void setMessagePermanent(Transaction txn, MessageId m) throws DbException; + /** + * Marks the given message as not shared. This method is only meant for + * testing. + */ + void setMessageNotShared(Transaction txn, MessageId m) throws DbException; + /** * Marks the given message as shared. */ @@ -609,6 +652,22 @@ public interface DatabaseComponent extends TransactionManager { void setTransportKeysActive(Transaction txn, TransportId t, KeySetId k) throws DbException; + /** + * Starts the cleanup timer for the given message, if a timer duration + * has been set and the timer has not already been started. + * + * @return The cleanup deadline, or {@link #TIMER_NOT_STARTED} if no + * timer duration has been set for this message or its timer has already + * been started. + */ + long startCleanupTimer(Transaction txn, MessageId m) throws DbException; + + /** + * Stops the cleanup timer for the given message, if the timer has been + * started. + */ + void stopCleanupTimer(Transaction txn, MessageId m) throws DbException; + /** * Stores the given transport keys, deleting any keys they have replaced. */ diff --git a/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreEagerSingletons.java b/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreEagerSingletons.java index ac13a5612..c256759ff 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreEagerSingletons.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreEagerSingletons.java @@ -1,5 +1,6 @@ package org.briarproject.bramble; +import org.briarproject.bramble.cleanup.CleanupModule; import org.briarproject.bramble.contact.ContactModule; import org.briarproject.bramble.crypto.CryptoExecutorModule; import org.briarproject.bramble.db.DatabaseExecutorModule; @@ -14,6 +15,8 @@ import org.briarproject.bramble.versioning.VersioningModule; public interface BrambleCoreEagerSingletons { + void inject(CleanupModule.EagerSingletons init); + void inject(ContactModule.EagerSingletons init); void inject(CryptoExecutorModule.EagerSingletons init); @@ -39,6 +42,7 @@ public interface BrambleCoreEagerSingletons { class Helper { public static void injectEagerSingletons(BrambleCoreEagerSingletons c) { + c.inject(new CleanupModule.EagerSingletons()); c.inject(new ContactModule.EagerSingletons()); c.inject(new CryptoExecutorModule.EagerSingletons()); c.inject(new DatabaseExecutorModule.EagerSingletons()); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreModule.java b/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreModule.java index 8d34971d3..447bd5cb6 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreModule.java @@ -1,5 +1,6 @@ package org.briarproject.bramble; +import org.briarproject.bramble.cleanup.CleanupModule; import org.briarproject.bramble.client.ClientModule; import org.briarproject.bramble.connection.ConnectionModule; import org.briarproject.bramble.contact.ContactModule; @@ -27,6 +28,7 @@ import org.briarproject.bramble.versioning.VersioningModule; import dagger.Module; @Module(includes = { + CleanupModule.class, ClientModule.class, ConnectionModule.class, ContactModule.class, diff --git a/bramble-core/src/main/java/org/briarproject/bramble/cleanup/CleanupManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/cleanup/CleanupManagerImpl.java new file mode 100644 index 000000000..a66ef5b8d --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/cleanup/CleanupManagerImpl.java @@ -0,0 +1,181 @@ +package org.briarproject.bramble.cleanup; + +import org.briarproject.bramble.api.cleanup.CleanupHook; +import org.briarproject.bramble.api.cleanup.CleanupManager; +import org.briarproject.bramble.api.cleanup.event.CleanupTimerStartedEvent; +import org.briarproject.bramble.api.cleanup.event.MessagesCleanedUpEvent; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.db.DatabaseExecutor; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.db.Transaction; +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.event.EventListener; +import org.briarproject.bramble.api.lifecycle.Service; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.ClientId; +import org.briarproject.bramble.api.sync.Group; +import org.briarproject.bramble.api.sync.GroupId; +import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.system.Clock; +import org.briarproject.bramble.api.system.TaskScheduler; +import org.briarproject.bramble.api.versioning.ClientMajorVersion; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.logging.Logger; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; + +import static java.lang.Math.max; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADLINE; +import static org.briarproject.bramble.util.LogUtils.logException; + +@ThreadSafe +@NotNullByDefault +class CleanupManagerImpl implements CleanupManager, Service, EventListener { + + private static final Logger LOG = + getLogger(CleanupManagerImpl.class.getName()); + + private final Executor dbExecutor; + private final DatabaseComponent db; + private final TaskScheduler taskScheduler; + private final Clock clock; + private final Map hooks = + new ConcurrentHashMap<>(); + private final Object lock = new Object(); + + @GuardedBy("lock") + private final Set pending = new HashSet<>(); + + @Inject + CleanupManagerImpl(@DatabaseExecutor Executor dbExecutor, + DatabaseComponent db, TaskScheduler taskScheduler, Clock clock) { + this.dbExecutor = dbExecutor; + this.db = db; + this.taskScheduler = taskScheduler; + this.clock = clock; + } + + @Override + public void registerCleanupHook(ClientId c, int majorVersion, + CleanupHook hook) { + hooks.put(new ClientMajorVersion(c, majorVersion), hook); + } + + @Override + public void startService() { + maybeScheduleTask(clock.currentTimeMillis()); + } + + @Override + public void stopService() { + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof CleanupTimerStartedEvent) { + CleanupTimerStartedEvent a = (CleanupTimerStartedEvent) e; + maybeScheduleTask(a.getCleanupDeadline()); + } + } + + private void maybeScheduleTask(long deadline) { + synchronized (lock) { + long minDeadline = Long.MAX_VALUE; + for (CleanupTask task : pending) { + if (task.deadline < minDeadline) minDeadline = task.deadline; + } + if (deadline < minDeadline) { + CleanupTask task = new CleanupTask(deadline); + pending.add(task); + scheduleTask(task); + } + } + } + + private void scheduleTask(CleanupTask task) { + long now = clock.currentTimeMillis(); + long delay = max(0, task.deadline - now + BATCH_DELAY_MS); + if (LOG.isLoggable(INFO)) { + LOG.info("Scheduling cleanup task in " + delay + " ms"); + } + taskScheduler.schedule(() -> deleteMessagesAndScheduleNextTask(task), + dbExecutor, delay, MILLISECONDS); + } + + private void deleteMessagesAndScheduleNextTask(CleanupTask task) { + try { + synchronized (lock) { + pending.remove(task); + } + long deadline = db.transactionWithResult(false, txn -> { + deleteMessages(txn); + return db.getNextCleanupDeadline(txn); + }); + if (deadline != NO_CLEANUP_DEADLINE) { + maybeScheduleTask(deadline); + } + } catch (DbException e) { + logException(LOG, WARNING, e); + } + } + + private void deleteMessages(Transaction txn) throws DbException { + Map clientCache = new HashMap<>(); + Map> deleted = new HashMap<>(); + Map ids = db.getMessagesToDelete(txn); + if (LOG.isLoggable(INFO)) LOG.info(ids.size() + " messages to delete"); + for (Entry e : ids.entrySet()) { + MessageId m = e.getKey(); + GroupId g = e.getValue(); + ClientMajorVersion cv = clientCache.get(g); + if (cv == null) { + Group group = db.getGroup(txn, g); + cv = new ClientMajorVersion(group.getClientId(), + group.getMajorVersion()); + clientCache.put(g, cv); + } + CleanupHook hook = hooks.get(cv); + if (hook == null) { + if (LOG.isLoggable(WARNING)) { + LOG.warning("No cleanup hook for " + cv); + } + } else if (hook.deleteMessage(txn, g, m)) { + Collection messageIds = deleted.get(g); + if (messageIds == null) { + messageIds = new ArrayList<>(); + deleted.put(g, messageIds); + } + messageIds.add(m); + } else { + LOG.info("Message was not deleted"); + } + } + for (Entry> e : deleted.entrySet()) { + txn.attach(new MessagesCleanedUpEvent(e.getKey(), e.getValue())); + } + } + + private static class CleanupTask { + + private final long deadline; + + private CleanupTask(long deadline) { + this.deadline = deadline; + } + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/cleanup/CleanupModule.java b/bramble-core/src/main/java/org/briarproject/bramble/cleanup/CleanupModule.java new file mode 100644 index 000000000..955dd3bdc --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/cleanup/CleanupModule.java @@ -0,0 +1,29 @@ +package org.briarproject.bramble.cleanup; + +import org.briarproject.bramble.api.cleanup.CleanupManager; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.lifecycle.LifecycleManager; + +import javax.inject.Inject; +import javax.inject.Singleton; + +import dagger.Module; +import dagger.Provides; + +@Module +public class CleanupModule { + + public static class EagerSingletons { + @Inject + CleanupManager cleanupManager; + } + + @Provides + @Singleton + CleanupManager provideCleanupManager(LifecycleManager lifecycleManager, + EventBus eventBus, CleanupManagerImpl cleanupManager) { + lifecycleManager.registerService(cleanupManager); + eventBus.addListener(cleanupManager); + return cleanupManager; + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index f6745af24..657cbf33f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -497,6 +497,24 @@ interface Database { */ Collection getMessagesToShare(T txn) throws DbException; + /** + * Returns the IDs of any messages of any messages that are due for + * deletion, along with their group IDs. + *

+ * Read-only. + */ + Map getMessagesToDelete(T txn) throws DbException; + + /** + * Returns the next time (in milliseconds since the Unix epoch) when a + * message is due to be deleted, or + * {@link DatabaseComponent#NO_CLEANUP_DEADLINE} if no messages are + * scheduled to be deleted. + *

+ * Read-only. + */ + long getNextCleanupDeadline(T txn) throws DbException; + /** * Returns the next time (in milliseconds since the Unix epoch) when a * message is due to be sent to the given contact. The returned value may @@ -606,8 +624,10 @@ interface Database { /** * Marks a message as having been seen by the given contact. + * + * @return True if the message was not already marked as seen. */ - void raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException; + boolean raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException; /** * Removes a contact from the database. @@ -671,6 +691,13 @@ interface Database { */ void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException; + /** + * Sets the cleanup timer duration for the given message. This does not + * start the message's cleanup timer. + */ + void setCleanupTimerDuration(T txn, MessageId m, long duration) + throws DbException; + /** * Marks the given contact as verified. */ @@ -701,9 +728,10 @@ interface Database { void setMessagePermanent(T txn, MessageId m) throws DbException; /** - * Marks the given message as shared. + * Marks the given message as shared or not. */ - void setMessageShared(T txn, MessageId m) throws DbException; + void setMessageShared(T txn, MessageId m, boolean shared) + throws DbException; /** * Sets the validation and delivery state of the given message. @@ -730,6 +758,22 @@ interface Database { void setTransportKeysActive(T txn, TransportId t, KeySetId k) throws DbException; + /** + * Starts the cleanup timer for the given message, if a timer duration + * has been set and the timer has not already been started. + * + * @return The cleanup deadline, or + * {@link DatabaseComponent#TIMER_NOT_STARTED} if no timer duration has + * been set for this message or its timer has already been started. + */ + long startCleanupTimer(T txn, MessageId m) throws DbException; + + /** + * Stops the cleanup timer for the given message, if the timer has been + * started. + */ + void stopCleanupTimer(T txn, MessageId m) throws DbException; + /** * Updates the transmission count, expiry time and estimated time of arrival * of the given message with respect to the given contact, using the latency diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 6bc18af7b..8ba081520 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -1,5 +1,6 @@ package org.briarproject.bramble.db; +import org.briarproject.bramble.api.cleanup.event.CleanupTimerStartedEvent; import org.briarproject.bramble.api.contact.Contact; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.PendingContact; @@ -606,6 +607,13 @@ class DatabaseComponentImpl implements DatabaseComponent { return db.getMessagesToShare(txn); } + @Override + public Map getMessagesToDelete(Transaction transaction) + throws DbException { + T txn = unbox(transaction); + return db.getMessagesToDelete(txn); + } + @Override public Map getMessageMetadata(Transaction transaction, GroupId g) throws DbException { @@ -701,6 +709,13 @@ class DatabaseComponentImpl implements DatabaseComponent { return db.getMessageDependents(txn, m); } + @Override + public long getNextCleanupDeadline(Transaction transaction) + throws DbException { + T txn = unbox(transaction); + return db.getNextCleanupDeadline(txn); + } + @Override public long getNextSendTime(Transaction transaction, ContactId c) throws DbException { @@ -804,8 +819,15 @@ class DatabaseComponentImpl implements DatabaseComponent { Collection acked = new ArrayList<>(); for (MessageId m : a.getMessageIds()) { if (db.containsVisibleMessage(txn, c, m)) { - db.raiseSeenFlag(txn, c, m); - acked.add(m); + if (db.raiseSeenFlag(txn, c, m)) { + // This is the first time the message has been acked + long deadline = db.startCleanupTimer(txn, m); + if (deadline != TIMER_NOT_STARTED) { + transaction.attach(new CleanupTimerStartedEvent(m, + deadline)); + } + acked.add(m); + } } } if (acked.size() > 0) { @@ -961,6 +983,16 @@ class DatabaseComponentImpl implements DatabaseComponent { db.removeTransportKeys(txn, t, k); } + @Override + public void setCleanupTimerDuration(Transaction transaction, MessageId m, + long duration) throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsMessage(txn, m)) + throw new NoSuchMessageException(); + db.setCleanupTimerDuration(txn, m, duration); + } + @Override public void setContactVerified(Transaction transaction, ContactId c) throws DbException { @@ -1010,6 +1042,16 @@ class DatabaseComponentImpl implements DatabaseComponent { db.setMessagePermanent(txn, m); } + @Override + public void setMessageNotShared(Transaction transaction, MessageId m) + throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsMessage(txn, m)) + throw new NoSuchMessageException(); + db.setMessageShared(txn, m, false); + } + @Override public void setMessageShared(Transaction transaction, MessageId m) throws DbException { @@ -1019,7 +1061,7 @@ class DatabaseComponentImpl implements DatabaseComponent { throw new NoSuchMessageException(); if (db.getMessageState(txn, m) != DELIVERED) throw new IllegalArgumentException("Shared undelivered message"); - db.setMessageShared(txn, m); + db.setMessageShared(txn, m, true); transaction.attach(new MessageSharedEvent(m)); } @@ -1091,6 +1133,30 @@ class DatabaseComponentImpl implements DatabaseComponent { db.setTransportKeysActive(txn, t, k); } + @Override + public long startCleanupTimer(Transaction transaction, MessageId m) + throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsMessage(txn, m)) + throw new NoSuchMessageException(); + long deadline = db.startCleanupTimer(txn, m); + if (deadline != TIMER_NOT_STARTED) { + transaction.attach(new CleanupTimerStartedEvent(m, deadline)); + } + return deadline; + } + + @Override + public void stopCleanupTimer(Transaction transaction, MessageId m) + throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsMessage(txn, m)) + throw new NoSuchMessageException(); + db.stopCleanupTimer(txn, m); + } + @Override public void updateTransportKeys(Transaction transaction, Collection keys) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index c56b48e8d..17dca37d6 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -72,6 +72,8 @@ import static java.util.Arrays.asList; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADLINE; +import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED; import static org.briarproject.bramble.api.db.Metadata.REMOVE; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; @@ -98,7 +100,7 @@ import static org.briarproject.bramble.util.LogUtils.now; abstract class JdbcDatabase implements Database { // Package access for testing - static final int CODE_SCHEMA_VERSION = 47; + static final int CODE_SCHEMA_VERSION = 48; // Time period offsets for incoming transport keys private static final int OFFSET_PREV = -1; @@ -180,6 +182,11 @@ abstract class JdbcDatabase implements Database { + " state INT NOT NULL," + " shared BOOLEAN NOT NULL," + " temporary BOOLEAN NOT NULL," + // Null if no timer duration has been set + + " cleanupTimerDuration BIGINT," + // Null if no timer duration has been set or the timer + // hasn't started + + " cleanupDeadline BIGINT," + " length INT NOT NULL," + " raw BLOB," // Null if message has been deleted + " PRIMARY KEY (messageId)," @@ -336,6 +343,10 @@ abstract class JdbcDatabase implements Database { "CREATE INDEX IF NOT EXISTS statusesByContactIdTimestamp" + " ON statuses (contactId, timestamp)"; + private static final String INDEX_MESSAGES_BY_CLEANUP_DEADLINE = + "CREATE INDEX IF NOT EXISTS messagesByCleanupDeadline" + + " ON messages (cleanupDeadline)"; + private static final Logger LOG = getLogger(JdbcDatabase.class.getName()); @@ -463,7 +474,8 @@ abstract class JdbcDatabase implements Database { new Migration43_44(dbTypes), new Migration44_45(), new Migration45_46(), - new Migration46_47(dbTypes) + new Migration46_47(dbTypes), + new Migration47_48() ); } @@ -531,6 +543,7 @@ abstract class JdbcDatabase implements Database { s.executeUpdate(INDEX_MESSAGE_DEPENDENCIES_BY_DEPENDENCY_ID); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_GROUP_ID); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TIMESTAMP); + s.executeUpdate(INDEX_MESSAGES_BY_CLEANUP_DEADLINE); s.close(); } catch (SQLException e) { tryToClose(s, LOG, WARNING); @@ -1290,7 +1303,9 @@ abstract class JdbcDatabase implements Database { public void deleteMessage(Connection txn, MessageId m) throws DbException { PreparedStatement ps = null; try { - String sql = "UPDATE messages SET raw = NULL WHERE messageId = ?"; + String sql = "UPDATE messages" + + " SET raw = NULL, cleanupDeadline = NULL" + + " WHERE messageId = ?"; ps = txn.prepareStatement(sql); ps.setBytes(1, m.getBytes()); int affected = ps.executeUpdate(); @@ -1769,7 +1784,6 @@ abstract class JdbcDatabase implements Database { // Return early if there are no matches if (intersection.isEmpty()) return Collections.emptySet(); } - if (intersection == null) throw new AssertionError(); return intersection; } catch (SQLException e) { tryToClose(rs, LOG, WARNING); @@ -2226,6 +2240,33 @@ abstract class JdbcDatabase implements Database { } } + @Override + public Map getMessagesToDelete(Connection txn) + throws DbException { + long now = clock.currentTimeMillis(); + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT messageId, groupId FROM messages" + + " WHERE cleanupDeadline <= ?"; + ps = txn.prepareStatement(sql); + ps.setLong(1, now); + rs = ps.executeQuery(); + Map ids = new HashMap<>(); + while (rs.next()) { + ids.put(new MessageId(rs.getBytes(1)), + new GroupId(rs.getBytes(2))); + } + rs.close(); + ps.close(); + return ids; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public long getNextSendTime(Connection txn, ContactId c) throws DbException { @@ -2256,6 +2297,31 @@ abstract class JdbcDatabase implements Database { } } + @Override + public long getNextCleanupDeadline(Connection txn) throws DbException { + Statement s = null; + ResultSet rs = null; + try { + String sql = "SELECT cleanupDeadline FROM messages" + + " WHERE cleanupDeadline IS NOT NULL" + + " ORDER BY cleanupDeadline LIMIT 1"; + s = txn.createStatement(); + rs = s.executeQuery(sql); + long nextDeadline = NO_CLEANUP_DEADLINE; + if (rs.next()) { + nextDeadline = rs.getLong(1); + if (rs.next()) throw new AssertionError(); + } + rs.close(); + s.close(); + return nextDeadline; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(s, LOG, WARNING); + throw new DbException(e); + } + } + @Override public PendingContact getPendingContact(Connection txn, PendingContactId p) throws DbException { @@ -2776,7 +2842,7 @@ abstract class JdbcDatabase implements Database { } @Override - public void raiseSeenFlag(Connection txn, ContactId c, MessageId m) + public boolean raiseSeenFlag(Connection txn, ContactId c, MessageId m) throws DbException { PreparedStatement ps = null; try { @@ -2788,6 +2854,7 @@ abstract class JdbcDatabase implements Database { int affected = ps.executeUpdate(); if (affected < 0 || affected > 1) throw new DbStateException(); ps.close(); + return affected == 1; } catch (SQLException e) { tryToClose(ps, LOG, WARNING); throw new DbException(e); @@ -3021,6 +3088,25 @@ abstract class JdbcDatabase implements Database { } } + @Override + public void setCleanupTimerDuration(Connection txn, MessageId m, + long duration) throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE messages SET cleanupTimerDuration = ?" + + " WHERE messageId = ? AND cleanupTimerDuration IS NULL"; + ps = txn.prepareStatement(sql); + ps.setLong(1, duration); + ps.setBytes(2, m.getBytes()); + int affected = ps.executeUpdate(); + if (affected < 0 || affected > 1) throw new DbStateException(); + ps.close(); + } catch (SQLException e) { + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public void setContactVerified(Connection txn, ContactId c) throws DbException { @@ -3128,22 +3214,24 @@ abstract class JdbcDatabase implements Database { } @Override - public void setMessageShared(Connection txn, MessageId m) + public void setMessageShared(Connection txn, MessageId m, boolean shared) throws DbException { PreparedStatement ps = null; try { - String sql = "UPDATE messages SET shared = TRUE" + String sql = "UPDATE messages SET shared = ?" + " WHERE messageId = ?"; ps = txn.prepareStatement(sql); - ps.setBytes(1, m.getBytes()); + ps.setBoolean(1, shared); + ps.setBytes(2, m.getBytes()); int affected = ps.executeUpdate(); if (affected < 0 || affected > 1) throw new DbStateException(); ps.close(); // Update denormalised column in statuses - sql = "UPDATE statuses SET messageShared = TRUE" + sql = "UPDATE statuses SET messageShared = ?" + " WHERE messageId = ?"; ps = txn.prepareStatement(sql); - ps.setBytes(1, m.getBytes()); + ps.setBoolean(1, shared); + ps.setBytes(2, m.getBytes()); affected = ps.executeUpdate(); if (affected < 0) throw new DbStateException(); ps.close(); @@ -3272,6 +3360,62 @@ abstract class JdbcDatabase implements Database { } } + @Override + public long startCleanupTimer(Connection txn, MessageId m) + throws DbException { + long now = clock.currentTimeMillis(); + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "UPDATE messages" + + " SET cleanupDeadline = ? + cleanupTimerDuration" + + " WHERE messageId = ?" + + " AND cleanupTimerDuration IS NOT NULL" + + " AND cleanupDeadline IS NULL"; + ps = txn.prepareStatement(sql); + ps.setLong(1, now); + ps.setBytes(2, m.getBytes()); + int affected = ps.executeUpdate(); + if (affected < 0 || affected > 1) throw new DbStateException(); + ps.close(); + if (affected == 0) return TIMER_NOT_STARTED; + sql = "SELECT cleanupDeadline FROM messages WHERE messageId = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + rs = ps.executeQuery(); + if (!rs.next()) throw new DbStateException(); + long deadline = rs.getLong(1); + if (rs.next()) throw new DbStateException(); + rs.close(); + ps.close(); + return deadline; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + + @Override + public void stopCleanupTimer(Connection txn, MessageId m) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "UPDATE messages SET cleanupDeadline = NULL" + + " WHERE messageId = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + int affected = ps.executeUpdate(); + if (affected < 0 || affected > 1) throw new DbStateException(); + ps.close(); + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public void updateExpiryTimeAndEta(Connection txn, ContactId c, MessageId m, int maxLatency) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Migration47_48.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Migration47_48.java new file mode 100644 index 000000000..690185794 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Migration47_48.java @@ -0,0 +1,47 @@ +package org.briarproject.bramble.db; + +import org.briarproject.bramble.api.db.DbException; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.logging.Logger; + +import static java.util.logging.Level.WARNING; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.db.JdbcUtils.tryToClose; + +class Migration47_48 implements Migration { + + private static final Logger LOG = getLogger(Migration47_48.class.getName()); + + @Override + public int getStartVersion() { + return 47; + } + + @Override + public int getEndVersion() { + return 48; + } + + @Override + public void migrate(Connection txn) throws DbException { + Statement s = null; + try { + s = txn.createStatement(); + // Null if no timer duration has been set + s.execute("ALTER TABLE messages" + + " ADD COLUMN cleanupTimerDuration BIGINT"); + // Null if no timer duration has been set or the timer + // hasn't started + s.execute("ALTER TABLE messages" + + " ADD COLUMN cleanupDeadline BIGINT"); + s.execute("CREATE INDEX IF NOT EXISTS messagesByCleanupDeadline" + + " ON messages (cleanupDeadline)"); + } catch (SQLException e) { + tryToClose(s, LOG, WARNING); + throw new DbException(e); + } + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java index dd6c4efc0..39615b2e5 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java @@ -1,5 +1,6 @@ package org.briarproject.bramble.db; +import org.briarproject.bramble.api.cleanup.event.CleanupTimerStartedEvent; import org.briarproject.bramble.api.contact.Contact; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.PendingContactId; @@ -69,6 +70,8 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; +import static java.util.concurrent.TimeUnit.HOURS; +import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE; @@ -610,11 +613,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { throws Exception { context.checking(new Expectations() {{ // Check whether the message is in the DB (which it's not) - exactly(12).of(database).startTransaction(); + exactly(15).of(database).startTransaction(); will(returnValue(txn)); - exactly(12).of(database).containsMessage(txn, messageId); + exactly(15).of(database).containsMessage(txn, messageId); will(returnValue(false)); - exactly(12).of(database).abortTransaction(txn); + exactly(15).of(database).abortTransaction(txn); // Allow other checks to pass allowing(database).containsContact(txn, contactId); will(returnValue(true)); @@ -639,7 +642,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { } try { - db.transaction(false, transaction -> + db.transaction(true, transaction -> db.getMessage(transaction, messageId)); fail(); } catch (NoSuchMessageException expected) { @@ -647,7 +650,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { } try { - db.transaction(false, transaction -> + db.transaction(true, transaction -> db.getMessageMetadata(transaction, messageId)); fail(); } catch (NoSuchMessageException expected) { @@ -655,7 +658,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { } try { - db.transaction(false, transaction -> + db.transaction(true, transaction -> db.getMessageState(transaction, messageId)); fail(); } catch (NoSuchMessageException expected) { @@ -663,7 +666,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { } try { - db.transaction(false, transaction -> + db.transaction(true, transaction -> db.getMessageStatus(transaction, contactId, messageId)); fail(); } catch (NoSuchMessageException expected) { @@ -678,6 +681,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { // Expected } + try { + db.transaction(false, transaction -> + db.setCleanupTimerDuration(transaction, message.getId(), + HOURS.toMillis(1))); + fail(); + } catch (NoSuchMessageException expected) { + // Expected + } + try { db.transaction(false, transaction -> db.setMessagePermanent(transaction, message.getId())); @@ -703,7 +715,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { } try { - db.transaction(false, transaction -> + db.transaction(true, transaction -> db.getMessageDependencies(transaction, messageId)); fail(); } catch (NoSuchMessageException expected) { @@ -711,12 +723,28 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { } try { - db.transaction(false, transaction -> + db.transaction(true, transaction -> db.getMessageDependents(transaction, messageId)); fail(); } catch (NoSuchMessageException expected) { // Expected } + + try { + db.transaction(false, transaction -> + db.startCleanupTimer(transaction, messageId)); + fail(); + } catch (NoSuchMessageException expected) { + // Expected + } + + try { + db.transaction(false, transaction -> + db.stopCleanupTimer(transaction, messageId)); + fail(); + } catch (NoSuchMessageException expected) { + // Expected + } } @Test @@ -997,6 +1025,9 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { oneOf(database).containsVisibleMessage(txn, contactId, messageId); will(returnValue(true)); oneOf(database).raiseSeenFlag(txn, contactId, messageId); + will(returnValue(true)); + oneOf(database).startCleanupTimer(txn, messageId); + will(returnValue(TIMER_NOT_STARTED)); // No cleanup duration was set oneOf(database).commitTransaction(txn); oneOf(eventBus).broadcast(with(any(MessagesAckedEvent.class))); }}); @@ -1009,6 +1040,56 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { }); } + @Test + public void testReceiveDuplicateAck() throws Exception { + context.checking(new Expectations() {{ + oneOf(database).startTransaction(); + will(returnValue(txn)); + oneOf(database).containsContact(txn, contactId); + will(returnValue(true)); + oneOf(database).containsVisibleMessage(txn, contactId, messageId); + will(returnValue(true)); + oneOf(database).raiseSeenFlag(txn, contactId, messageId); + will(returnValue(false)); // Already acked + oneOf(database).commitTransaction(txn); + }}); + DatabaseComponent db = createDatabaseComponent(database, eventBus, + eventExecutor, shutdownManager); + + db.transaction(false, transaction -> { + Ack a = new Ack(singletonList(messageId)); + db.receiveAck(transaction, contactId, a); + }); + } + + @Test + public void testReceiveAckWithCleanupTimer() throws Exception { + long deadline = System.currentTimeMillis(); + context.checking(new Expectations() {{ + oneOf(database).startTransaction(); + will(returnValue(txn)); + oneOf(database).containsContact(txn, contactId); + will(returnValue(true)); + oneOf(database).containsVisibleMessage(txn, contactId, messageId); + will(returnValue(true)); + oneOf(database).raiseSeenFlag(txn, contactId, messageId); + will(returnValue(true)); + oneOf(database).startCleanupTimer(txn, messageId); + will(returnValue(deadline)); + oneOf(database).commitTransaction(txn); + oneOf(eventBus).broadcast(with(any( + CleanupTimerStartedEvent.class))); + oneOf(eventBus).broadcast(with(any(MessagesAckedEvent.class))); + }}); + DatabaseComponent db = createDatabaseComponent(database, eventBus, + eventExecutor, shutdownManager); + + db.transaction(false, transaction -> { + Ack a = new Ack(singletonList(messageId)); + db.receiveAck(transaction, contactId, a); + }); + } + @Test public void testReceiveMessage() throws Exception { context.checking(new Expectations() {{ diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java index ac4231216..3fdd13735 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java @@ -53,10 +53,11 @@ import java.util.concurrent.atomic.AtomicLong; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADLINE; +import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED; import static org.briarproject.bramble.api.db.Metadata.REMOVE; import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_AUTHOR_NAME_LENGTH; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; @@ -351,7 +352,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); // Sharing the message should make it sendable - db.setMessageShared(txn, messageId); + db.setMessageShared(txn, messageId, true); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertEquals(singletonList(messageId), ids); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); @@ -631,8 +632,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // The group should not be visible to the contact assertEquals(INVISIBLE, db.getGroupVisibility(txn, contactId, groupId)); - assertEquals(emptyMap(), - db.getGroupVisibility(txn, groupId)); + assertTrue(db.getGroupVisibility(txn, groupId).isEmpty()); // Make the group visible to the contact db.addGroupVisibility(txn, contactId, groupId, false); @@ -655,8 +655,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // Make the group invisible again db.removeGroupVisibility(txn, contactId, groupId); assertEquals(INVISIBLE, db.getGroupVisibility(txn, contactId, groupId)); - assertEquals(emptyMap(), - db.getGroupVisibility(txn, groupId)); + assertTrue(db.getGroupVisibility(txn, groupId).isEmpty()); db.commitTransaction(txn); db.close(); @@ -2040,7 +2039,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId)); // Share the message - now it should be sendable immediately - db.setMessageShared(txn, messageId); + db.setMessageShared(txn, messageId, true); assertEquals(0, db.getNextSendTime(txn, contactId)); // Mark the message as requested - it should still be sendable @@ -2347,6 +2346,87 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { db.close(); } + @Test + public void testCleanupTimer() throws Exception { + long duration = 60_000; + long now = System.currentTimeMillis(); + AtomicLong time = new AtomicLong(now); + Database db = + open(false, new TestMessageFactory(), new SettableClock(time)); + Connection txn = db.startTransaction(); + + // No messages should be due or scheduled for deletion + assertTrue(db.getMessagesToDelete(txn).isEmpty()); + assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn)); + + // Add a group and a message + db.addGroup(txn, group); + db.addMessage(txn, message, DELIVERED, false, false, null); + + // No messages should be due or scheduled for deletion + assertTrue(db.getMessagesToDelete(txn).isEmpty()); + assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn)); + + // Set the message's cleanup timer duration + db.setCleanupTimerDuration(txn, messageId, duration); + + // No messages should be due or scheduled for deletion + assertTrue(db.getMessagesToDelete(txn).isEmpty()); + assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn)); + + // Start the message's cleanup timer + assertEquals(now + duration, db.startCleanupTimer(txn, messageId)); + + // The timer can't be started again + assertEquals(TIMER_NOT_STARTED, db.startCleanupTimer(txn, messageId)); + + // No messages should be due for deletion, but the message should be + // scheduled for deletion + assertTrue(db.getMessagesToDelete(txn).isEmpty()); + assertEquals(now + duration, db.getNextCleanupDeadline(txn)); + + // Stop the timer + db.stopCleanupTimer(txn, messageId); + + // No messages should be due or scheduled for deletion + assertTrue(db.getMessagesToDelete(txn).isEmpty()); + assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn)); + + // Start the timer again + assertEquals(now + duration, db.startCleanupTimer(txn, messageId)); + + // No messages should be due for deletion, but the message should be + // scheduled for deletion + assertTrue(db.getMessagesToDelete(txn).isEmpty()); + assertEquals(now + duration, db.getNextCleanupDeadline(txn)); + + // 1 ms before the timer expires, no messages should be due for + // deletion but the message should be scheduled for deletion + time.set(now + duration - 1); + assertTrue(db.getMessagesToDelete(txn).isEmpty()); + assertEquals(now + duration, db.getNextCleanupDeadline(txn)); + + // When the timer expires, the message should be due and scheduled for + // deletion + time.set(now + duration); + assertEquals(singletonMap(messageId, groupId), + db.getMessagesToDelete(txn)); + assertEquals(now + duration, db.getNextCleanupDeadline(txn)); + + // 1 ms after the timer expires, the message should be due and + // scheduled for deletion + time.set(now + duration + 1); + assertEquals(singletonMap(messageId, groupId), + db.getMessagesToDelete(txn)); + assertEquals(now + duration, db.getNextCleanupDeadline(txn)); + + // Once the message has been deleted, it should no longer be due + // or scheduled for deletion + db.deleteMessage(txn, messageId); + assertTrue(db.getMessagesToDelete(txn).isEmpty()); + assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn)); + } + private Database open(boolean resume) throws Exception { return open(resume, new TestMessageFactory(), new SystemClock()); }