Add cleanup manager.

This commit is contained in:
akwizgran
2021-02-25 15:40:52 +00:00
committed by Torsten Grote
parent 6113b4ebee
commit 3d8826cef9
16 changed files with 878 additions and 34 deletions

View File

@@ -0,0 +1,14 @@
package org.briarproject.bramble.api.cleanup;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.MessageId;
@NotNullByDefault
public interface CleanupHook {
boolean deleteMessage(Transaction txn, GroupId g, MessageId m)
throws DbException;
}

View File

@@ -0,0 +1,25 @@
package org.briarproject.bramble.api.cleanup;
import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.ClientId;
@NotNullByDefault
public interface CleanupManager {
/**
* When scheduling a cleanup task we overshoot the deadline by this many
* milliseconds to reduce the number of tasks that need to be scheduled
* when messages have cleanup deadlines that are close together.
*/
long BATCH_DELAY_MS = 1000;
/**
* Registers a hook to be called when messages are due for cleanup.
* This method should be called before
* {@link LifecycleManager#startServices(SecretKey)}.
*/
void registerCleanupHook(ClientId c, int majorVersion,
CleanupHook hook);
}

View File

@@ -0,0 +1,32 @@
package org.briarproject.bramble.api.cleanup.event;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageId;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when a message's cleanup timer is started.
*/
@Immutable
@NotNullByDefault
public class CleanupTimerStartedEvent extends Event {
private final MessageId messageId;
private final long cleanupDeadline;
public CleanupTimerStartedEvent(MessageId messageId,
long cleanupDeadline) {
this.messageId = messageId;
this.cleanupDeadline = cleanupDeadline;
}
public MessageId getMessageId() {
return messageId;
}
public long getCleanupDeadline() {
return cleanupDeadline;
}
}

View File

@@ -0,0 +1,36 @@
package org.briarproject.bramble.api.cleanup.event;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.MessageId;
import java.util.Collection;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when one or more messages in a group are
* cleaned up.
*/
@Immutable
@NotNullByDefault
public class MessagesCleanedUpEvent extends Event {
private final GroupId groupId;
private final Collection<MessageId> messageIds;
public MessagesCleanedUpEvent(GroupId groupId,
Collection<MessageId> messageIds) {
this.groupId = groupId;
this.messageIds = messageIds;
}
public GroupId getGroupId() {
return groupId;
}
public Collection<MessageId> getMessageIds() {
return messageIds;
}
}

View File

@@ -128,12 +128,12 @@ public interface ClientHelper {
* group.
*/
ContactId getContactId(Transaction txn, GroupId contactGroupId)
throws DbException, FormatException;
throws DbException, FormatException;
/**
* Stores the given contact ID in the group metadata of the given contact
* group.
*/
void setContactId(Transaction txn, GroupId contactGroupId, ContactId c)
throws DbException;
throws DbException;
}

View File

@@ -41,6 +41,18 @@ import javax.annotation.Nullable;
@NotNullByDefault
public interface DatabaseComponent extends TransactionManager {
/**
* Return value for {@link #getNextCleanupDeadline(Transaction)} if
* no messages are scheduled to be deleted.
*/
long NO_CLEANUP_DEADLINE = -1;
/**
* Return value for {@link #startCleanupTimer(Transaction, MessageId)}
* if the cleanup timer was not started.
*/
long TIMER_NOT_STARTED = -1;
/**
* Opens the database and returns true if the database already existed.
*
@@ -324,6 +336,15 @@ public interface DatabaseComponent extends TransactionManager {
Collection<MessageId> getMessagesToShare(Transaction txn)
throws DbException;
/**
* Returns the IDs of any messages of any messages that are due for
* deletion, along with their group IDs.
* <p/>
* Read-only.
*/
Map<MessageId, GroupId> getMessagesToDelete(Transaction txn)
throws DbException;
/**
* Returns the metadata for all delivered messages in the given group.
* <p/>
@@ -405,6 +426,15 @@ public interface DatabaseComponent extends TransactionManager {
MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m)
throws DbException;
/**
* Returns the next time (in milliseconds since the Unix epoch) when a
* message is due to be deleted, or {@link #NO_CLEANUP_DEADLINE}
* if no messages are scheduled to be deleted.
* <p/>
* Read-only.
*/
long getNextCleanupDeadline(Transaction txn) throws DbException;
/*
* Returns the next time (in milliseconds since the Unix epoch) when a
* message is due to be sent to the given contact. The returned value may
@@ -545,6 +575,13 @@ public interface DatabaseComponent extends TransactionManager {
void removeTransportKeys(Transaction txn, TransportId t, KeySetId k)
throws DbException;
/**
* Sets the cleanup timer duration for the given message. This does not
* start the message's cleanup timer.
*/
void setCleanupTimerDuration(Transaction txn, MessageId m, long duration)
throws DbException;
/**
* Marks the given contact as verified.
*/
@@ -567,6 +604,12 @@ public interface DatabaseComponent extends TransactionManager {
*/
void setMessagePermanent(Transaction txn, MessageId m) throws DbException;
/**
* Marks the given message as not shared. This method is only meant for
* testing.
*/
void setMessageNotShared(Transaction txn, MessageId m) throws DbException;
/**
* Marks the given message as shared.
*/
@@ -609,6 +652,22 @@ public interface DatabaseComponent extends TransactionManager {
void setTransportKeysActive(Transaction txn, TransportId t, KeySetId k)
throws DbException;
/**
* Starts the cleanup timer for the given message, if a timer duration
* has been set and the timer has not already been started.
*
* @return The cleanup deadline, or {@link #TIMER_NOT_STARTED} if no
* timer duration has been set for this message or its timer has already
* been started.
*/
long startCleanupTimer(Transaction txn, MessageId m) throws DbException;
/**
* Stops the cleanup timer for the given message, if the timer has been
* started.
*/
void stopCleanupTimer(Transaction txn, MessageId m) throws DbException;
/**
* Stores the given transport keys, deleting any keys they have replaced.
*/

View File

@@ -1,5 +1,6 @@
package org.briarproject.bramble;
import org.briarproject.bramble.cleanup.CleanupModule;
import org.briarproject.bramble.contact.ContactModule;
import org.briarproject.bramble.crypto.CryptoExecutorModule;
import org.briarproject.bramble.db.DatabaseExecutorModule;
@@ -14,6 +15,8 @@ import org.briarproject.bramble.versioning.VersioningModule;
public interface BrambleCoreEagerSingletons {
void inject(CleanupModule.EagerSingletons init);
void inject(ContactModule.EagerSingletons init);
void inject(CryptoExecutorModule.EagerSingletons init);
@@ -39,6 +42,7 @@ public interface BrambleCoreEagerSingletons {
class Helper {
public static void injectEagerSingletons(BrambleCoreEagerSingletons c) {
c.inject(new CleanupModule.EagerSingletons());
c.inject(new ContactModule.EagerSingletons());
c.inject(new CryptoExecutorModule.EagerSingletons());
c.inject(new DatabaseExecutorModule.EagerSingletons());

View File

@@ -1,5 +1,6 @@
package org.briarproject.bramble;
import org.briarproject.bramble.cleanup.CleanupModule;
import org.briarproject.bramble.client.ClientModule;
import org.briarproject.bramble.connection.ConnectionModule;
import org.briarproject.bramble.contact.ContactModule;
@@ -27,6 +28,7 @@ import org.briarproject.bramble.versioning.VersioningModule;
import dagger.Module;
@Module(includes = {
CleanupModule.class,
ClientModule.class,
ConnectionModule.class,
ContactModule.class,

View File

@@ -0,0 +1,181 @@
package org.briarproject.bramble.cleanup;
import org.briarproject.bramble.api.cleanup.CleanupHook;
import org.briarproject.bramble.api.cleanup.CleanupManager;
import org.briarproject.bramble.api.cleanup.event.CleanupTimerStartedEvent;
import org.briarproject.bramble.api.cleanup.event.MessagesCleanedUpEvent;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.Service;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.ClientId;
import org.briarproject.bramble.api.sync.Group;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.versioning.ClientMajorVersion;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import static java.lang.Math.max;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADLINE;
import static org.briarproject.bramble.util.LogUtils.logException;
@ThreadSafe
@NotNullByDefault
class CleanupManagerImpl implements CleanupManager, Service, EventListener {
private static final Logger LOG =
getLogger(CleanupManagerImpl.class.getName());
private final Executor dbExecutor;
private final DatabaseComponent db;
private final TaskScheduler taskScheduler;
private final Clock clock;
private final Map<ClientMajorVersion, CleanupHook> hooks =
new ConcurrentHashMap<>();
private final Object lock = new Object();
@GuardedBy("lock")
private final Set<CleanupTask> pending = new HashSet<>();
@Inject
CleanupManagerImpl(@DatabaseExecutor Executor dbExecutor,
DatabaseComponent db, TaskScheduler taskScheduler, Clock clock) {
this.dbExecutor = dbExecutor;
this.db = db;
this.taskScheduler = taskScheduler;
this.clock = clock;
}
@Override
public void registerCleanupHook(ClientId c, int majorVersion,
CleanupHook hook) {
hooks.put(new ClientMajorVersion(c, majorVersion), hook);
}
@Override
public void startService() {
maybeScheduleTask(clock.currentTimeMillis());
}
@Override
public void stopService() {
}
@Override
public void eventOccurred(Event e) {
if (e instanceof CleanupTimerStartedEvent) {
CleanupTimerStartedEvent a = (CleanupTimerStartedEvent) e;
maybeScheduleTask(a.getCleanupDeadline());
}
}
private void maybeScheduleTask(long deadline) {
synchronized (lock) {
long minDeadline = Long.MAX_VALUE;
for (CleanupTask task : pending) {
if (task.deadline < minDeadline) minDeadline = task.deadline;
}
if (deadline < minDeadline) {
CleanupTask task = new CleanupTask(deadline);
pending.add(task);
scheduleTask(task);
}
}
}
private void scheduleTask(CleanupTask task) {
long now = clock.currentTimeMillis();
long delay = max(0, task.deadline - now + BATCH_DELAY_MS);
if (LOG.isLoggable(INFO)) {
LOG.info("Scheduling cleanup task in " + delay + " ms");
}
taskScheduler.schedule(() -> deleteMessagesAndScheduleNextTask(task),
dbExecutor, delay, MILLISECONDS);
}
private void deleteMessagesAndScheduleNextTask(CleanupTask task) {
try {
synchronized (lock) {
pending.remove(task);
}
long deadline = db.transactionWithResult(false, txn -> {
deleteMessages(txn);
return db.getNextCleanupDeadline(txn);
});
if (deadline != NO_CLEANUP_DEADLINE) {
maybeScheduleTask(deadline);
}
} catch (DbException e) {
logException(LOG, WARNING, e);
}
}
private void deleteMessages(Transaction txn) throws DbException {
Map<GroupId, ClientMajorVersion> clientCache = new HashMap<>();
Map<GroupId, Collection<MessageId>> deleted = new HashMap<>();
Map<MessageId, GroupId> ids = db.getMessagesToDelete(txn);
if (LOG.isLoggable(INFO)) LOG.info(ids.size() + " messages to delete");
for (Entry<MessageId, GroupId> e : ids.entrySet()) {
MessageId m = e.getKey();
GroupId g = e.getValue();
ClientMajorVersion cv = clientCache.get(g);
if (cv == null) {
Group group = db.getGroup(txn, g);
cv = new ClientMajorVersion(group.getClientId(),
group.getMajorVersion());
clientCache.put(g, cv);
}
CleanupHook hook = hooks.get(cv);
if (hook == null) {
if (LOG.isLoggable(WARNING)) {
LOG.warning("No cleanup hook for " + cv);
}
} else if (hook.deleteMessage(txn, g, m)) {
Collection<MessageId> messageIds = deleted.get(g);
if (messageIds == null) {
messageIds = new ArrayList<>();
deleted.put(g, messageIds);
}
messageIds.add(m);
} else {
LOG.info("Message was not deleted");
}
}
for (Entry<GroupId, Collection<MessageId>> e : deleted.entrySet()) {
txn.attach(new MessagesCleanedUpEvent(e.getKey(), e.getValue()));
}
}
private static class CleanupTask {
private final long deadline;
private CleanupTask(long deadline) {
this.deadline = deadline;
}
}
}

View File

@@ -0,0 +1,29 @@
package org.briarproject.bramble.cleanup;
import org.briarproject.bramble.api.cleanup.CleanupManager;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import javax.inject.Inject;
import javax.inject.Singleton;
import dagger.Module;
import dagger.Provides;
@Module
public class CleanupModule {
public static class EagerSingletons {
@Inject
CleanupManager cleanupManager;
}
@Provides
@Singleton
CleanupManager provideCleanupManager(LifecycleManager lifecycleManager,
EventBus eventBus, CleanupManagerImpl cleanupManager) {
lifecycleManager.registerService(cleanupManager);
eventBus.addListener(cleanupManager);
return cleanupManager;
}
}

View File

@@ -497,6 +497,24 @@ interface Database<T> {
*/
Collection<MessageId> getMessagesToShare(T txn) throws DbException;
/**
* Returns the IDs of any messages of any messages that are due for
* deletion, along with their group IDs.
* <p/>
* Read-only.
*/
Map<MessageId, GroupId> getMessagesToDelete(T txn) throws DbException;
/**
* Returns the next time (in milliseconds since the Unix epoch) when a
* message is due to be deleted, or
* {@link DatabaseComponent#NO_CLEANUP_DEADLINE} if no messages are
* scheduled to be deleted.
* <p/>
* Read-only.
*/
long getNextCleanupDeadline(T txn) throws DbException;
/**
* Returns the next time (in milliseconds since the Unix epoch) when a
* message is due to be sent to the given contact. The returned value may
@@ -606,8 +624,10 @@ interface Database<T> {
/**
* Marks a message as having been seen by the given contact.
*
* @return True if the message was not already marked as seen.
*/
void raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException;
boolean raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Removes a contact from the database.
@@ -671,6 +691,13 @@ interface Database<T> {
*/
void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException;
/**
* Sets the cleanup timer duration for the given message. This does not
* start the message's cleanup timer.
*/
void setCleanupTimerDuration(T txn, MessageId m, long duration)
throws DbException;
/**
* Marks the given contact as verified.
*/
@@ -701,9 +728,10 @@ interface Database<T> {
void setMessagePermanent(T txn, MessageId m) throws DbException;
/**
* Marks the given message as shared.
* Marks the given message as shared or not.
*/
void setMessageShared(T txn, MessageId m) throws DbException;
void setMessageShared(T txn, MessageId m, boolean shared)
throws DbException;
/**
* Sets the validation and delivery state of the given message.
@@ -730,6 +758,22 @@ interface Database<T> {
void setTransportKeysActive(T txn, TransportId t, KeySetId k)
throws DbException;
/**
* Starts the cleanup timer for the given message, if a timer duration
* has been set and the timer has not already been started.
*
* @return The cleanup deadline, or
* {@link DatabaseComponent#TIMER_NOT_STARTED} if no timer duration has
* been set for this message or its timer has already been started.
*/
long startCleanupTimer(T txn, MessageId m) throws DbException;
/**
* Stops the cleanup timer for the given message, if the timer has been
* started.
*/
void stopCleanupTimer(T txn, MessageId m) throws DbException;
/**
* Updates the transmission count, expiry time and estimated time of arrival
* of the given message with respect to the given contact, using the latency

View File

@@ -1,5 +1,6 @@
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.cleanup.event.CleanupTimerStartedEvent;
import org.briarproject.bramble.api.contact.Contact;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.PendingContact;
@@ -606,6 +607,13 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessagesToShare(txn);
}
@Override
public Map<MessageId, GroupId> getMessagesToDelete(Transaction transaction)
throws DbException {
T txn = unbox(transaction);
return db.getMessagesToDelete(txn);
}
@Override
public Map<MessageId, Metadata> getMessageMetadata(Transaction transaction,
GroupId g) throws DbException {
@@ -701,6 +709,13 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageDependents(txn, m);
}
@Override
public long getNextCleanupDeadline(Transaction transaction)
throws DbException {
T txn = unbox(transaction);
return db.getNextCleanupDeadline(txn);
}
@Override
public long getNextSendTime(Transaction transaction, ContactId c)
throws DbException {
@@ -804,8 +819,15 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
Collection<MessageId> acked = new ArrayList<>();
for (MessageId m : a.getMessageIds()) {
if (db.containsVisibleMessage(txn, c, m)) {
db.raiseSeenFlag(txn, c, m);
acked.add(m);
if (db.raiseSeenFlag(txn, c, m)) {
// This is the first time the message has been acked
long deadline = db.startCleanupTimer(txn, m);
if (deadline != TIMER_NOT_STARTED) {
transaction.attach(new CleanupTimerStartedEvent(m,
deadline));
}
acked.add(m);
}
}
}
if (acked.size() > 0) {
@@ -961,6 +983,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.removeTransportKeys(txn, t, k);
}
@Override
public void setCleanupTimerDuration(Transaction transaction, MessageId m,
long duration) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
db.setCleanupTimerDuration(txn, m, duration);
}
@Override
public void setContactVerified(Transaction transaction, ContactId c)
throws DbException {
@@ -1010,6 +1042,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.setMessagePermanent(txn, m);
}
@Override
public void setMessageNotShared(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
db.setMessageShared(txn, m, false);
}
@Override
public void setMessageShared(Transaction transaction, MessageId m)
throws DbException {
@@ -1019,7 +1061,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchMessageException();
if (db.getMessageState(txn, m) != DELIVERED)
throw new IllegalArgumentException("Shared undelivered message");
db.setMessageShared(txn, m);
db.setMessageShared(txn, m, true);
transaction.attach(new MessageSharedEvent(m));
}
@@ -1091,6 +1133,30 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.setTransportKeysActive(txn, t, k);
}
@Override
public long startCleanupTimer(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
long deadline = db.startCleanupTimer(txn, m);
if (deadline != TIMER_NOT_STARTED) {
transaction.attach(new CleanupTimerStartedEvent(m, deadline));
}
return deadline;
}
@Override
public void stopCleanupTimer(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
db.stopCleanupTimer(txn, m);
}
@Override
public void updateTransportKeys(Transaction transaction,
Collection<TransportKeySet> keys) throws DbException {

View File

@@ -72,6 +72,8 @@ import static java.util.Arrays.asList;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADLINE;
import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED;
import static org.briarproject.bramble.api.db.Metadata.REMOVE;
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
@@ -98,7 +100,7 @@ import static org.briarproject.bramble.util.LogUtils.now;
abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing
static final int CODE_SCHEMA_VERSION = 47;
static final int CODE_SCHEMA_VERSION = 48;
// Time period offsets for incoming transport keys
private static final int OFFSET_PREV = -1;
@@ -180,6 +182,11 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " state INT NOT NULL,"
+ " shared BOOLEAN NOT NULL,"
+ " temporary BOOLEAN NOT NULL,"
// Null if no timer duration has been set
+ " cleanupTimerDuration BIGINT,"
// Null if no timer duration has been set or the timer
// hasn't started
+ " cleanupDeadline BIGINT,"
+ " length INT NOT NULL,"
+ " raw BLOB," // Null if message has been deleted
+ " PRIMARY KEY (messageId),"
@@ -336,6 +343,10 @@ abstract class JdbcDatabase implements Database<Connection> {
"CREATE INDEX IF NOT EXISTS statusesByContactIdTimestamp"
+ " ON statuses (contactId, timestamp)";
private static final String INDEX_MESSAGES_BY_CLEANUP_DEADLINE =
"CREATE INDEX IF NOT EXISTS messagesByCleanupDeadline"
+ " ON messages (cleanupDeadline)";
private static final Logger LOG =
getLogger(JdbcDatabase.class.getName());
@@ -463,7 +474,8 @@ abstract class JdbcDatabase implements Database<Connection> {
new Migration43_44(dbTypes),
new Migration44_45(),
new Migration45_46(),
new Migration46_47(dbTypes)
new Migration46_47(dbTypes),
new Migration47_48()
);
}
@@ -531,6 +543,7 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(INDEX_MESSAGE_DEPENDENCIES_BY_DEPENDENCY_ID);
s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_GROUP_ID);
s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TIMESTAMP);
s.executeUpdate(INDEX_MESSAGES_BY_CLEANUP_DEADLINE);
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
@@ -1290,7 +1303,9 @@ abstract class JdbcDatabase implements Database<Connection> {
public void deleteMessage(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE messages SET raw = NULL WHERE messageId = ?";
String sql = "UPDATE messages"
+ " SET raw = NULL, cleanupDeadline = NULL"
+ " WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
int affected = ps.executeUpdate();
@@ -1769,7 +1784,6 @@ abstract class JdbcDatabase implements Database<Connection> {
// Return early if there are no matches
if (intersection.isEmpty()) return Collections.emptySet();
}
if (intersection == null) throw new AssertionError();
return intersection;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
@@ -2226,6 +2240,33 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public Map<MessageId, GroupId> getMessagesToDelete(Connection txn)
throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT messageId, groupId FROM messages"
+ " WHERE cleanupDeadline <= ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, now);
rs = ps.executeQuery();
Map<MessageId, GroupId> ids = new HashMap<>();
while (rs.next()) {
ids.put(new MessageId(rs.getBytes(1)),
new GroupId(rs.getBytes(2)));
}
rs.close();
ps.close();
return ids;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public long getNextSendTime(Connection txn, ContactId c)
throws DbException {
@@ -2256,6 +2297,31 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public long getNextCleanupDeadline(Connection txn) throws DbException {
Statement s = null;
ResultSet rs = null;
try {
String sql = "SELECT cleanupDeadline FROM messages"
+ " WHERE cleanupDeadline IS NOT NULL"
+ " ORDER BY cleanupDeadline LIMIT 1";
s = txn.createStatement();
rs = s.executeQuery(sql);
long nextDeadline = NO_CLEANUP_DEADLINE;
if (rs.next()) {
nextDeadline = rs.getLong(1);
if (rs.next()) throw new AssertionError();
}
rs.close();
s.close();
return nextDeadline;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public PendingContact getPendingContact(Connection txn, PendingContactId p)
throws DbException {
@@ -2776,7 +2842,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
@Override
public void raiseSeenFlag(Connection txn, ContactId c, MessageId m)
public boolean raiseSeenFlag(Connection txn, ContactId c, MessageId m)
throws DbException {
PreparedStatement ps = null;
try {
@@ -2788,6 +2854,7 @@ abstract class JdbcDatabase implements Database<Connection> {
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
return affected == 1;
} catch (SQLException e) {
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
@@ -3021,6 +3088,25 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public void setCleanupTimerDuration(Connection txn, MessageId m,
long duration) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE messages SET cleanupTimerDuration = ?"
+ " WHERE messageId = ? AND cleanupTimerDuration IS NULL";
ps = txn.prepareStatement(sql);
ps.setLong(1, duration);
ps.setBytes(2, m.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public void setContactVerified(Connection txn, ContactId c)
throws DbException {
@@ -3128,22 +3214,24 @@ abstract class JdbcDatabase implements Database<Connection> {
}
@Override
public void setMessageShared(Connection txn, MessageId m)
public void setMessageShared(Connection txn, MessageId m, boolean shared)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE messages SET shared = TRUE"
String sql = "UPDATE messages SET shared = ?"
+ " WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setBoolean(1, shared);
ps.setBytes(2, m.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
// Update denormalised column in statuses
sql = "UPDATE statuses SET messageShared = TRUE"
sql = "UPDATE statuses SET messageShared = ?"
+ " WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setBoolean(1, shared);
ps.setBytes(2, m.getBytes());
affected = ps.executeUpdate();
if (affected < 0) throw new DbStateException();
ps.close();
@@ -3272,6 +3360,62 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public long startCleanupTimer(Connection txn, MessageId m)
throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "UPDATE messages"
+ " SET cleanupDeadline = ? + cleanupTimerDuration"
+ " WHERE messageId = ?"
+ " AND cleanupTimerDuration IS NOT NULL"
+ " AND cleanupDeadline IS NULL";
ps = txn.prepareStatement(sql);
ps.setLong(1, now);
ps.setBytes(2, m.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
if (affected == 0) return TIMER_NOT_STARTED;
sql = "SELECT cleanupDeadline FROM messages WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
rs = ps.executeQuery();
if (!rs.next()) throw new DbStateException();
long deadline = rs.getLong(1);
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
return deadline;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public void stopCleanupTimer(Connection txn, MessageId m)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "UPDATE messages SET cleanupDeadline = NULL"
+ " WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public void updateExpiryTimeAndEta(Connection txn, ContactId c, MessageId m,
int maxLatency) throws DbException {

View File

@@ -0,0 +1,47 @@
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
class Migration47_48 implements Migration<Connection> {
private static final Logger LOG = getLogger(Migration47_48.class.getName());
@Override
public int getStartVersion() {
return 47;
}
@Override
public int getEndVersion() {
return 48;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
try {
s = txn.createStatement();
// Null if no timer duration has been set
s.execute("ALTER TABLE messages"
+ " ADD COLUMN cleanupTimerDuration BIGINT");
// Null if no timer duration has been set or the timer
// hasn't started
s.execute("ALTER TABLE messages"
+ " ADD COLUMN cleanupDeadline BIGINT");
s.execute("CREATE INDEX IF NOT EXISTS messagesByCleanupDeadline"
+ " ON messages (cleanupDeadline)");
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
}

View File

@@ -1,5 +1,6 @@
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.cleanup.event.CleanupTimerStartedEvent;
import org.briarproject.bramble.api.contact.Contact;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.PendingContactId;
@@ -69,6 +70,8 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.HOURS;
import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED;
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
@@ -610,11 +613,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
throws Exception {
context.checking(new Expectations() {{
// Check whether the message is in the DB (which it's not)
exactly(12).of(database).startTransaction();
exactly(15).of(database).startTransaction();
will(returnValue(txn));
exactly(12).of(database).containsMessage(txn, messageId);
exactly(15).of(database).containsMessage(txn, messageId);
will(returnValue(false));
exactly(12).of(database).abortTransaction(txn);
exactly(15).of(database).abortTransaction(txn);
// Allow other checks to pass
allowing(database).containsContact(txn, contactId);
will(returnValue(true));
@@ -639,7 +642,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
}
try {
db.transaction(false, transaction ->
db.transaction(true, transaction ->
db.getMessage(transaction, messageId));
fail();
} catch (NoSuchMessageException expected) {
@@ -647,7 +650,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
}
try {
db.transaction(false, transaction ->
db.transaction(true, transaction ->
db.getMessageMetadata(transaction, messageId));
fail();
} catch (NoSuchMessageException expected) {
@@ -655,7 +658,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
}
try {
db.transaction(false, transaction ->
db.transaction(true, transaction ->
db.getMessageState(transaction, messageId));
fail();
} catch (NoSuchMessageException expected) {
@@ -663,7 +666,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
}
try {
db.transaction(false, transaction ->
db.transaction(true, transaction ->
db.getMessageStatus(transaction, contactId, messageId));
fail();
} catch (NoSuchMessageException expected) {
@@ -678,6 +681,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
// Expected
}
try {
db.transaction(false, transaction ->
db.setCleanupTimerDuration(transaction, message.getId(),
HOURS.toMillis(1)));
fail();
} catch (NoSuchMessageException expected) {
// Expected
}
try {
db.transaction(false, transaction ->
db.setMessagePermanent(transaction, message.getId()));
@@ -703,7 +715,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
}
try {
db.transaction(false, transaction ->
db.transaction(true, transaction ->
db.getMessageDependencies(transaction, messageId));
fail();
} catch (NoSuchMessageException expected) {
@@ -711,12 +723,28 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
}
try {
db.transaction(false, transaction ->
db.transaction(true, transaction ->
db.getMessageDependents(transaction, messageId));
fail();
} catch (NoSuchMessageException expected) {
// Expected
}
try {
db.transaction(false, transaction ->
db.startCleanupTimer(transaction, messageId));
fail();
} catch (NoSuchMessageException expected) {
// Expected
}
try {
db.transaction(false, transaction ->
db.stopCleanupTimer(transaction, messageId));
fail();
} catch (NoSuchMessageException expected) {
// Expected
}
}
@Test
@@ -997,6 +1025,9 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).containsVisibleMessage(txn, contactId, messageId);
will(returnValue(true));
oneOf(database).raiseSeenFlag(txn, contactId, messageId);
will(returnValue(true));
oneOf(database).startCleanupTimer(txn, messageId);
will(returnValue(TIMER_NOT_STARTED)); // No cleanup duration was set
oneOf(database).commitTransaction(txn);
oneOf(eventBus).broadcast(with(any(MessagesAckedEvent.class)));
}});
@@ -1009,6 +1040,56 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
});
}
@Test
public void testReceiveDuplicateAck() throws Exception {
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).containsVisibleMessage(txn, contactId, messageId);
will(returnValue(true));
oneOf(database).raiseSeenFlag(txn, contactId, messageId);
will(returnValue(false)); // Already acked
oneOf(database).commitTransaction(txn);
}});
DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager);
db.transaction(false, transaction -> {
Ack a = new Ack(singletonList(messageId));
db.receiveAck(transaction, contactId, a);
});
}
@Test
public void testReceiveAckWithCleanupTimer() throws Exception {
long deadline = System.currentTimeMillis();
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).containsVisibleMessage(txn, contactId, messageId);
will(returnValue(true));
oneOf(database).raiseSeenFlag(txn, contactId, messageId);
will(returnValue(true));
oneOf(database).startCleanupTimer(txn, messageId);
will(returnValue(deadline));
oneOf(database).commitTransaction(txn);
oneOf(eventBus).broadcast(with(any(
CleanupTimerStartedEvent.class)));
oneOf(eventBus).broadcast(with(any(MessagesAckedEvent.class)));
}});
DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager);
db.transaction(false, transaction -> {
Ack a = new Ack(singletonList(messageId));
db.receiveAck(transaction, contactId, a);
});
}
@Test
public void testReceiveMessage() throws Exception {
context.checking(new Expectations() {{

View File

@@ -53,10 +53,11 @@ import java.util.concurrent.atomic.AtomicLong;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADLINE;
import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED;
import static org.briarproject.bramble.api.db.Metadata.REMOVE;
import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_AUTHOR_NAME_LENGTH;
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
@@ -351,7 +352,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty());
// Sharing the message should make it sendable
db.setMessageShared(txn, messageId);
db.setMessageShared(txn, messageId, true);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids);
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
@@ -631,8 +632,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// The group should not be visible to the contact
assertEquals(INVISIBLE, db.getGroupVisibility(txn, contactId, groupId));
assertEquals(emptyMap(),
db.getGroupVisibility(txn, groupId));
assertTrue(db.getGroupVisibility(txn, groupId).isEmpty());
// Make the group visible to the contact
db.addGroupVisibility(txn, contactId, groupId, false);
@@ -655,8 +655,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// Make the group invisible again
db.removeGroupVisibility(txn, contactId, groupId);
assertEquals(INVISIBLE, db.getGroupVisibility(txn, contactId, groupId));
assertEquals(emptyMap(),
db.getGroupVisibility(txn, groupId));
assertTrue(db.getGroupVisibility(txn, groupId).isEmpty());
db.commitTransaction(txn);
db.close();
@@ -2040,7 +2039,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId));
// Share the message - now it should be sendable immediately
db.setMessageShared(txn, messageId);
db.setMessageShared(txn, messageId, true);
assertEquals(0, db.getNextSendTime(txn, contactId));
// Mark the message as requested - it should still be sendable
@@ -2347,6 +2346,87 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.close();
}
@Test
public void testCleanupTimer() throws Exception {
long duration = 60_000;
long now = System.currentTimeMillis();
AtomicLong time = new AtomicLong(now);
Database<Connection> db =
open(false, new TestMessageFactory(), new SettableClock(time));
Connection txn = db.startTransaction();
// No messages should be due or scheduled for deletion
assertTrue(db.getMessagesToDelete(txn).isEmpty());
assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn));
// Add a group and a message
db.addGroup(txn, group);
db.addMessage(txn, message, DELIVERED, false, false, null);
// No messages should be due or scheduled for deletion
assertTrue(db.getMessagesToDelete(txn).isEmpty());
assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn));
// Set the message's cleanup timer duration
db.setCleanupTimerDuration(txn, messageId, duration);
// No messages should be due or scheduled for deletion
assertTrue(db.getMessagesToDelete(txn).isEmpty());
assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn));
// Start the message's cleanup timer
assertEquals(now + duration, db.startCleanupTimer(txn, messageId));
// The timer can't be started again
assertEquals(TIMER_NOT_STARTED, db.startCleanupTimer(txn, messageId));
// No messages should be due for deletion, but the message should be
// scheduled for deletion
assertTrue(db.getMessagesToDelete(txn).isEmpty());
assertEquals(now + duration, db.getNextCleanupDeadline(txn));
// Stop the timer
db.stopCleanupTimer(txn, messageId);
// No messages should be due or scheduled for deletion
assertTrue(db.getMessagesToDelete(txn).isEmpty());
assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn));
// Start the timer again
assertEquals(now + duration, db.startCleanupTimer(txn, messageId));
// No messages should be due for deletion, but the message should be
// scheduled for deletion
assertTrue(db.getMessagesToDelete(txn).isEmpty());
assertEquals(now + duration, db.getNextCleanupDeadline(txn));
// 1 ms before the timer expires, no messages should be due for
// deletion but the message should be scheduled for deletion
time.set(now + duration - 1);
assertTrue(db.getMessagesToDelete(txn).isEmpty());
assertEquals(now + duration, db.getNextCleanupDeadline(txn));
// When the timer expires, the message should be due and scheduled for
// deletion
time.set(now + duration);
assertEquals(singletonMap(messageId, groupId),
db.getMessagesToDelete(txn));
assertEquals(now + duration, db.getNextCleanupDeadline(txn));
// 1 ms after the timer expires, the message should be due and
// scheduled for deletion
time.set(now + duration + 1);
assertEquals(singletonMap(messageId, groupId),
db.getMessagesToDelete(txn));
assertEquals(now + duration, db.getNextCleanupDeadline(txn));
// Once the message has been deleted, it should no longer be due
// or scheduled for deletion
db.deleteMessage(txn, messageId);
assertTrue(db.getMessagesToDelete(txn).isEmpty());
assertEquals(NO_CLEANUP_DEADLINE, db.getNextCleanupDeadline(txn));
}
private Database<Connection> open(boolean resume) throws Exception {
return open(resume, new TestMessageFactory(), new SystemClock());
}