Add database method to reset retransmission times

Will be used to ensure messages are not stranded on a Mailbox, when such
is added, removed, or otherwise changed.

Closes #2190.
This commit is contained in:
Daniel Lublin
2021-12-08 09:15:03 +01:00
parent 8d5803098b
commit 3eb3dbde09
5 changed files with 106 additions and 2 deletions

View File

@@ -471,6 +471,14 @@ public interface DatabaseComponent extends TransactionManager {
Map<MessageId, Integer> getUnackedMessagesToSend(Transaction txn,
ContactId c) throws DbException;
/**
* Reset the transmission count, expiry time and ETA of all messages that
* are eligible to be sent to the given contact. Messages are selected in
* the same way as {@link #getUnackedMessagesToSend(Transaction, ContactId)}
*/
void resetUnackedMessagesToSend(Transaction txn, ContactId c)
throws DbException;
/**
* Returns the total length, including headers, of all messages that are
* eligible to be sent to the given contact. This may include messages

View File

@@ -757,6 +757,13 @@ interface Database<T> {
*/
void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException;
/**
* Resets the transmission count, expiry time and ETA of the given messages
* with respect to the given contact.
*/
void resetExpiryTimeAndEta(T txn, ContactId c, Collection<MessageId> ids)
throws DbException;
/**
* Sets the cleanup timer duration for the given message. This does not
* start the message's cleanup timer.
@@ -845,8 +852,8 @@ interface Database<T> {
* of the given message with respect to the given contact, using the latency
* of the transport over which it was sent.
*/
void updateExpiryTimeAndEta(T txn, ContactId c, MessageId m, long maxLatency)
throws DbException;
void updateExpiryTimeAndEta(T txn, ContactId c, MessageId m,
long maxLatency) throws DbException;
/**
* Stores the given transport keys, deleting any keys they have replaced.

View File

@@ -750,6 +750,18 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getUnackedMessagesToSend(txn, c);
}
@Override
public void resetUnackedMessagesToSend(Transaction transaction, ContactId c)
throws DbException {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> unackedToSend =
new ArrayList<>(db.getUnackedMessagesToSend(txn, c).keySet());
if (unackedToSend.isEmpty()) return;
db.resetExpiryTimeAndEta(txn, c, unackedToSend);
}
@Override
public long getUnackedMessageBytesToSend(Transaction transaction,
ContactId c) throws DbException {

View File

@@ -3290,6 +3290,34 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public void resetExpiryTimeAndEta(Connection txn, ContactId c,
Collection<MessageId> ids)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE statuses SET expiry = 0, txCount = 0, eta = 0"
+ " WHERE contactId = ? AND messageId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
for (MessageId m : ids) {
ps.setBytes(2, m.getBytes());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != ids.size()) {
throw new DbStateException();
}
for (int rows : batchAffected) {
if (rows < 0 || rows > 1) throw new DbStateException();
}
ps.close();
} catch (SQLException e) {
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public void setCleanupTimerDuration(Connection txn, MessageId m,
long duration) throws DbException {

View File

@@ -2197,6 +2197,55 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.close();
}
@Test
public void testResetRetransmissionTimes() throws Exception {
long now = System.currentTimeMillis();
AtomicLong time = new AtomicLong(now);
Database<Connection> db =
open(false, new TestMessageFactory(), new SettableClock(time));
Connection txn = db.startTransaction();
// Add a contact, a shared group and a shared message
db.addIdentity(txn, identity);
assertEquals(contactId,
db.addContact(txn, author, localAuthor.getId(), null, true));
db.addGroup(txn, group);
db.addGroupVisibility(txn, contactId, groupId, true);
db.addMessage(txn, message, DELIVERED, true, false, null);
// Time: now
// Retrieve the message from the database
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids);
// Time: now
// Mark the message as sent
db.updateExpiryTimeAndEta(txn, contactId, messageId, MAX_LATENCY);
// The message should expire after 2 * MAX_LATENCY
assertEquals(now + MAX_LATENCY * 2, db.getNextSendTime(txn, contactId));
// Time: now + MAX_LATENCY * 2 - 1
// The message should not yet be sendable
time.set(now + MAX_LATENCY * 2 - 1);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty());
// Reset the retransmission times
db.resetExpiryTimeAndEta(txn, contactId, singletonList(messageId));
// The message should have infinitely short expiry
assertEquals(0, db.getNextSendTime(txn, contactId));
// The message should be sendable
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertFalse(ids.isEmpty());
db.commitTransaction(txn);
db.close();
}
@Test
public void testCompactionTime() throws Exception {
MessageFactory messageFactory = new TestMessageFactory();