Replace ETA with max latency in retransmission logic

This commit is contained in:
Daniel Lublin
2022-03-25 13:53:08 +01:00
parent edc1029e92
commit dd1c8c8301
7 changed files with 106 additions and 62 deletions

View File

@@ -472,9 +472,9 @@ public interface DatabaseComponent extends TransactionManager {
ContactId c) throws DbException;
/**
* Reset the transmission count, expiry time and ETA of all messages that
* are eligible to be sent to the given contact. This includes messages that
* have already been sent and are not yet due for retransmission.
* Resets the transmission count, expiry time and max latency of all messages
* that are eligible to be sent to the given contact. This includes messages
* that have already been sent and are not yet due for retransmission.
*/
void resetUnackedMessagesToSend(Transaction txn, ContactId c)
throws DbException;

View File

@@ -758,9 +758,10 @@ interface Database<T> {
void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException;
/**
* Resets the transmission count, expiry time and ETA of all messages that
* are eligible to be sent to the given contact. This includes messages that
* have already been sent and are not yet due for retransmission.
* Resets the transmission count, expiry time and max latency of all
* messages that are eligible to be sent to the given contact. This includes
* messages that have already been sent and are not yet due for
* retransmission.
*/
void resetUnackedMessagesToSend(T txn, ContactId c) throws DbException;
@@ -848,11 +849,13 @@ interface Database<T> {
void stopCleanupTimer(T txn, MessageId m) throws DbException;
/**
* Updates the transmission count, expiry time and estimated time of arrival
* of the given message with respect to the given contact, using the latency
* of the transport over which it was sent.
* Updates the transmission count, expiry time and max latency of the given
* message with respect to the given contact.
*
* @param maxLatency latency of the transport over which the message was
* sent.
*/
void updateExpiryTimeAndEta(T txn, ContactId c, MessageId m,
void updateRetransmissionData(T txn, ContactId c, MessageId m,
long maxLatency) throws DbException;
/**

View File

@@ -437,7 +437,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
Message message = db.getMessage(txn, m);
totalLength += message.getRawLength();
messages.add(message);
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
db.updateRetransmissionData(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
@@ -462,7 +462,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
totalLength += message.getRawLength();
messages.add(message);
sentIds.add(m);
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
db.updateRetransmissionData(txn, c, m, maxLatency);
}
}
if (messages.isEmpty()) return messages;
@@ -483,7 +483,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
if (ids.isEmpty()) return null;
for (MessageId m : ids)
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
db.updateRetransmissionData(txn, c, m, maxLatency);
return new Offer(ids);
}
@@ -518,7 +518,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
Message message = db.getMessage(txn, m);
totalLength += message.getRawLength();
messages.add(message);
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
db.updateRetransmissionData(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);

View File

@@ -102,7 +102,7 @@ import static org.briarproject.bramble.util.LogUtils.now;
abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing
static final int CODE_SCHEMA_VERSION = 49;
static final int CODE_SCHEMA_VERSION = 50;
// Time period offsets for incoming transport keys
private static final int OFFSET_PREV = -1;
@@ -252,7 +252,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " requested BOOLEAN NOT NULL,"
+ " expiry BIGINT NOT NULL,"
+ " txCount INT NOT NULL,"
+ " eta BIGINT NOT NULL,"
+ " maxLatency BIGINT," // Null if latency was reset
+ " PRIMARY KEY (messageId, contactId),"
+ " FOREIGN KEY (messageId)"
+ " REFERENCES messages (messageId)"
@@ -502,7 +502,8 @@ abstract class JdbcDatabase implements Database<Connection> {
new Migration45_46(),
new Migration46_47(dbTypes),
new Migration47_48(),
new Migration48_49()
new Migration48_49(),
new Migration49_50()
);
}
@@ -920,9 +921,10 @@ abstract class JdbcDatabase implements Database<Connection> {
try {
String sql = "INSERT INTO statuses (messageId, contactId, groupId,"
+ " timestamp, length, state, groupShared, messageShared,"
+ " deleted, ack, seen, requested, expiry, txCount, eta)"
+ " deleted, ack, seen, requested, expiry, txCount,"
+ " maxLatency)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, FALSE, 0, 0,"
+ " 0)";
+ " NULL)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
@@ -1156,17 +1158,17 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.setInt(2, DELIVERED.getValue());
} else {
long now = clock.currentTimeMillis();
long eta = now + maxLatency;
sql = "SELECT NULL FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE AND seen = FALSE"
+ " AND (expiry <= ? OR eta > ?)";
+ " AND (expiry <= ? OR maxLatency IS NULL"
+ " OR ? < maxLatency)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now);
ps.setLong(4, eta);
ps.setLong(4, maxLatency);
}
rs = ps.executeQuery();
boolean messagesToSend = rs.next();
@@ -2194,7 +2196,6 @@ abstract class JdbcDatabase implements Database<Connection> {
public Collection<MessageId> getMessagesToOffer(Connection txn,
ContactId c, int maxMessages, long maxLatency) throws DbException {
long now = clock.currentTimeMillis();
long eta = now + maxLatency;
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -2203,13 +2204,14 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = FALSE"
+ " AND (expiry <= ? OR eta > ?)"
+ " AND (expiry <= ? OR maxLatency IS NULL"
+ " OR ? < maxLatency)"
+ " ORDER BY timestamp LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now);
ps.setLong(4, eta);
ps.setLong(4, maxLatency);
ps.setInt(5, maxMessages);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
@@ -2253,7 +2255,6 @@ abstract class JdbcDatabase implements Database<Connection> {
public Collection<MessageId> getMessagesToSend(Connection txn, ContactId c,
int maxLength, long maxLatency) throws DbException {
long now = clock.currentTimeMillis();
long eta = now + maxLatency;
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -2262,13 +2263,14 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE"
+ " AND (expiry <= ? OR eta > ?)"
+ " AND (expiry <= ? OR maxLatency IS NULL"
+ " OR ? < maxLatency)"
+ " ORDER BY timestamp";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now);
ps.setLong(4, eta);
ps.setLong(4, maxLatency);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
int total = 0;
@@ -2552,7 +2554,6 @@ abstract class JdbcDatabase implements Database<Connection> {
public Collection<MessageId> getRequestedMessagesToSend(Connection txn,
ContactId c, int maxLength, long maxLatency) throws DbException {
long now = clock.currentTimeMillis();
long eta = now + maxLatency;
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -2561,13 +2562,14 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = TRUE"
+ " AND (expiry <= ? OR eta > ?)"
+ " AND (expiry <= ? OR maxLatency IS NULL"
+ " OR ? < maxLatency)"
+ " ORDER BY timestamp";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now);
ps.setLong(4, eta);
ps.setLong(4, maxLatency);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
int total = 0;
@@ -3298,7 +3300,8 @@ abstract class JdbcDatabase implements Database<Connection> {
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE statuses SET expiry = 0, txCount = 0, eta = 0"
String sql = "UPDATE statuses SET expiry = 0, txCount = 0,"
+ " maxLatency = NULL"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE AND seen = FALSE";
@@ -3643,8 +3646,8 @@ abstract class JdbcDatabase implements Database<Connection> {
}
@Override
public void updateExpiryTimeAndEta(Connection txn, ContactId c, MessageId m,
long maxLatency) throws DbException {
public void updateRetransmissionData(Connection txn, ContactId c,
MessageId m, long maxLatency) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -3660,13 +3663,12 @@ abstract class JdbcDatabase implements Database<Connection> {
rs.close();
ps.close();
sql = "UPDATE statuses"
+ " SET expiry = ?, txCount = txCount + 1, eta = ?"
+ " SET expiry = ?, txCount = txCount + 1, maxLatency = ?"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
long now = clock.currentTimeMillis();
long eta = now + maxLatency;
ps.setLong(1, calculateExpiry(now, maxLatency, txCount));
ps.setLong(2, eta);
ps.setLong(2, maxLatency);
ps.setBytes(3, m.getBytes());
ps.setInt(4, c.getInt());
int affected = ps.executeUpdate();

View File

@@ -0,0 +1,45 @@
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
class Migration49_50 implements Migration<Connection> {
private static final Logger LOG = getLogger(Migration49_50.class.getName());
@Override
public int getStartVersion() {
return 49;
}
@Override
public int getEndVersion() {
return 50;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
try {
s = txn.createStatement();
s.execute("ALTER TABLE statuses"
+ " ALTER COLUMN eta"
+ " RENAME TO maxLatency");
s.execute("ALTER TABLE statuses"
+ " ALTER COLUMN maxLatency"
+ " SET NULL");
s.execute("UPDATE statuses SET maxLatency = NULL");
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
}

View File

@@ -922,11 +922,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(ids));
oneOf(database).getMessage(txn, messageId);
will(returnValue(message));
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
oneOf(database).updateRetransmissionData(txn, contactId, messageId,
maxLatency);
oneOf(database).getMessage(txn, messageId1);
will(returnValue(message1));
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1,
oneOf(database).updateRetransmissionData(txn, contactId, messageId1,
maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids);
oneOf(database).commitTransaction(txn);
@@ -951,9 +951,9 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(true));
oneOf(database).getMessagesToOffer(txn, contactId, 123, maxLatency);
will(returnValue(ids));
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
oneOf(database).updateRetransmissionData(txn, contactId, messageId,
maxLatency);
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1,
oneOf(database).updateRetransmissionData(txn, contactId, messageId1,
maxLatency);
oneOf(database).commitTransaction(txn);
}});
@@ -1005,12 +1005,12 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(ids));
oneOf(database).getMessage(txn, messageId);
will(returnValue(message));
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
maxLatency);
oneOf(database).updateRetransmissionData(txn, contactId,
messageId, maxLatency);
oneOf(database).getMessage(txn, messageId1);
will(returnValue(message1));
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1,
maxLatency);
oneOf(database).updateRetransmissionData(txn, contactId,
messageId1, maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids);
oneOf(database).commitTransaction(txn);
oneOf(eventBus).broadcast(with(any(MessagesSentEvent.class)));

View File

@@ -444,7 +444,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertOneMessageToSendEagerly(db, txn);
// Mark the message as sent
db.updateExpiryTimeAndEta(txn, contactId, messageId, MAX_LATENCY);
db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY);
// The message should no longer be sendable via lazy retransmission,
// but it should still be sendable via eager retransmission
@@ -1811,7 +1811,8 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertFalse(status.isSeen());
// Pretend the message was sent to the contact
db.updateExpiryTimeAndEta(txn, contactId, messageId, Integer.MAX_VALUE);
db.updateRetransmissionData(txn, contactId, messageId,
Integer.MAX_VALUE);
// The message should be sent but not seen
status = db.getMessageStatus(txn, contactId, messageId);
@@ -2052,12 +2053,12 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// Update the message's expiry time as though we sent it - now the
// message should be sendable after one round-trip
db.updateExpiryTimeAndEta(txn, contactId, messageId, 1000);
db.updateRetransmissionData(txn, contactId, messageId, 1000);
assertEquals(now + 2000, db.getNextSendTime(txn, contactId));
// Update the message's expiry time again - now it should be sendable
// after two round-trips
db.updateExpiryTimeAndEta(txn, contactId, messageId, 1000);
db.updateRetransmissionData(txn, contactId, messageId, 1000);
assertEquals(now + 4000, db.getNextSendTime(txn, contactId));
// Delete the message - there should be no messages to send
@@ -2124,7 +2125,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// Time: now
// Mark the message as sent
db.updateExpiryTimeAndEta(txn, contactId, messageId, MAX_LATENCY);
db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY);
// The message should expire after 2 * MAX_LATENCY
assertEquals(now + MAX_LATENCY * 2, db.getNextSendTime(txn, contactId));
@@ -2161,36 +2162,29 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.addGroupVisibility(txn, contactId, groupId, true);
db.addMessage(txn, message, DELIVERED, true, false, null);
// Time: now
// Retrieve the message from the database
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids);
// Time: now
// Mark the message as sent
db.updateExpiryTimeAndEta(txn, contactId, messageId, MAX_LATENCY);
db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY);
// The message should expire after 2 * MAX_LATENCY
assertEquals(now + MAX_LATENCY * 2, db.getNextSendTime(txn, contactId));
// Time: now
// The message should not be sendable via the same transport
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty());
// Time: now
// The message should be sendable via a transport with a faster ETA
// The message should be sendable via a transport with a lower latency
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE,
MAX_LATENCY - 1);
assertEquals(singletonList(messageId), ids);
// Time: now + 1
// The message should no longer be sendable via the faster transport,
// as the ETA is now equal
time.set(now + 1);
// The message should not be sendable via a slower transport
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE,
MAX_LATENCY - 1);
MAX_LATENCY + 1);
assertTrue(ids.isEmpty());
db.commitTransaction(txn);
@@ -2221,7 +2215,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// Time: now
// Mark the message as sent
db.updateExpiryTimeAndEta(txn, contactId, messageId, MAX_LATENCY);
db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY);
// The message should expire after 2 * MAX_LATENCY
assertEquals(now + MAX_LATENCY * 2, db.getNextSendTime(txn, contactId));