Consider latency when getting next send time from DB.

This commit is contained in:
akwizgran
2022-06-16 17:05:30 +01:00
parent ff9f706670
commit e2a63ee361
6 changed files with 72 additions and 31 deletions

View File

@@ -541,15 +541,18 @@ public interface DatabaseComponent extends TransactionManager {
*/ */
long getNextCleanupDeadline(Transaction txn) throws DbException; long getNextCleanupDeadline(Transaction txn) throws DbException;
/* /**
* Returns the next time (in milliseconds since the Unix epoch) when a * Returns the next time (in milliseconds since the Unix epoch) when a
* message is due to be sent to the given contact. The returned value may * message is due to be sent to the given contact over a transport with
* be zero if a message is due to be sent immediately, or Long.MAX_VALUE if * the given latency.
* no messages are scheduled to be sent. * <p>
* The returned value may be zero if a message is due to be sent
* immediately, or Long.MAX_VALUE if no messages are scheduled to be sent.
* <p/> * <p/>
* Read-only. * Read-only.
*/ */
long getNextSendTime(Transaction txn, ContactId c) throws DbException; long getNextSendTime(Transaction txn, ContactId c, long maxLatency)
throws DbException;
/** /**
* Returns the pending contact with the given ID. * Returns the pending contact with the given ID.

View File

@@ -587,13 +587,16 @@ interface Database<T> {
/** /**
* Returns the next time (in milliseconds since the Unix epoch) when a * Returns the next time (in milliseconds since the Unix epoch) when a
* message is due to be sent to the given contact. The returned value may * message is due to be sent to the given contact over a transport with
* be zero if a message is due to be sent immediately, or Long.MAX_VALUE * the given latency.
* if no messages are scheduled to be sent. * <p>
* The returned value may be zero if a message is due to be sent
* immediately, or Long.MAX_VALUE if no messages are scheduled to be sent.
* <p/> * <p/>
* Read-only. * Read-only.
*/ */
long getNextSendTime(T txn, ContactId c) throws DbException; long getNextSendTime(T txn, ContactId c, long maxLatency)
throws DbException;
/** /**
* Returns the pending contact with the given ID. * Returns the pending contact with the given ID.

View File

@@ -814,10 +814,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
} }
@Override @Override
public long getNextSendTime(Transaction transaction, ContactId c) public long getNextSendTime(Transaction transaction, ContactId c,
throws DbException { long maxLatency) throws DbException {
T txn = unbox(transaction); T txn = unbox(transaction);
return db.getNextSendTime(txn, c); return db.getNextSendTime(txn, c, maxLatency);
} }
@Override @Override

View File

@@ -2490,12 +2490,28 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
@Override @Override
public long getNextSendTime(Connection txn, ContactId c) public long getNextSendTime(Connection txn, ContactId c, long maxLatency)
throws DbException { throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
String sql = "SELECT expiry FROM statuses" // Are any messages sendable immediately?
String sql = "SELECT NULL FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE AND seen = FALSE"
+ " AND (maxLatency IS NULL OR ? < maxLatency)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, maxLatency);
rs = ps.executeQuery();
boolean found = rs.next();
rs.close();
ps.close();
if (found) return 0;
// When is the earliest expiry time (could be in the past)?
sql = "SELECT expiry FROM statuses"
+ " WHERE contactId = ? AND state = ?" + " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE" + " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE AND seen = FALSE" + " AND deleted = FALSE AND seen = FALSE"

View File

@@ -313,7 +313,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
Collection<Message> batch = Collection<Message> batch =
db.generateRequestedBatch(txn, contactId, db.generateRequestedBatch(txn, contactId,
BATCH_CAPACITY, maxLatency); BATCH_CAPACITY, maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId)); setNextSendTime(db.getNextSendTime(txn, contactId,
maxLatency));
return batch; return batch;
}); });
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO))
@@ -356,7 +357,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
Offer o = db.transactionWithNullableResult(false, txn -> { Offer o = db.transactionWithNullableResult(false, txn -> {
Offer offer = db.generateOffer(txn, contactId, Offer offer = db.generateOffer(txn, contactId,
MAX_MESSAGE_IDS, maxLatency); MAX_MESSAGE_IDS, maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId)); setNextSendTime(db.getNextSendTime(txn, contactId,
maxLatency));
return offer; return offer;
}); });
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO))

View File

@@ -2020,37 +2020,51 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.addMessage(txn, message, UNKNOWN, false, false, null); db.addMessage(txn, message, UNKNOWN, false, false, null);
// There should be no messages to send // There should be no messages to send
assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId)); assertEquals(Long.MAX_VALUE,
db.getNextSendTime(txn, contactId, MAX_LATENCY));
// Share the group with the contact - still no messages to send // Share the group with the contact - still no messages to send
db.addGroupVisibility(txn, contactId, groupId, true); db.addGroupVisibility(txn, contactId, groupId, true);
assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId)); assertEquals(Long.MAX_VALUE,
db.getNextSendTime(txn, contactId, MAX_LATENCY));
// Set the message's state to DELIVERED - still no messages to send // Set the message's state to DELIVERED - still no messages to send
db.setMessageState(txn, messageId, DELIVERED); db.setMessageState(txn, messageId, DELIVERED);
assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId)); assertEquals(Long.MAX_VALUE,
db.getNextSendTime(txn, contactId, MAX_LATENCY));
// Share the message - now it should be sendable immediately // Share the message - now it should be sendable immediately
db.setMessageShared(txn, messageId, true); db.setMessageShared(txn, messageId, true);
assertEquals(0, db.getNextSendTime(txn, contactId)); assertEquals(0, db.getNextSendTime(txn, contactId, MAX_LATENCY));
// Mark the message as requested - it should still be sendable // Mark the message as requested - it should still be sendable
db.raiseRequestedFlag(txn, contactId, messageId); db.raiseRequestedFlag(txn, contactId, messageId);
assertEquals(0, db.getNextSendTime(txn, contactId)); assertEquals(0, db.getNextSendTime(txn, contactId, MAX_LATENCY));
// Update the message's expiry time as though we sent it - now the // Update the message's expiry time as though we sent it - now the
// message should be sendable after one round-trip // message should be sendable after one round-trip
db.updateRetransmissionData(txn, contactId, messageId, 1000); db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY);
assertEquals(now + 2000, db.getNextSendTime(txn, contactId)); assertEquals(now + MAX_LATENCY * 2,
db.getNextSendTime(txn, contactId, MAX_LATENCY));
// The message should be sendable immediately over a transport with
// lower latency
assertEquals(0L, db.getNextSendTime(txn, contactId, MAX_LATENCY - 1));
// Update the message's expiry time again - now it should be sendable // Update the message's expiry time again - now it should be sendable
// after two round-trips // after two round-trips
db.updateRetransmissionData(txn, contactId, messageId, 1000); db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY);
assertEquals(now + 4000, db.getNextSendTime(txn, contactId)); assertEquals(now + MAX_LATENCY * 4,
db.getNextSendTime(txn, contactId, MAX_LATENCY));
// The message should be sendable immediately over a transport with
// lower latency
assertEquals(0L, db.getNextSendTime(txn, contactId, MAX_LATENCY - 1));
// Delete the message - there should be no messages to send // Delete the message - there should be no messages to send
db.deleteMessage(txn, messageId); db.deleteMessage(txn, messageId);
assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId)); assertEquals(Long.MAX_VALUE,
db.getNextSendTime(txn, contactId, MAX_LATENCY));
db.commitTransaction(txn); db.commitTransaction(txn);
db.close(); db.close();
@@ -2115,7 +2129,8 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY); db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY);
// The message should expire after 2 * MAX_LATENCY // The message should expire after 2 * MAX_LATENCY
assertEquals(now + MAX_LATENCY * 2, db.getNextSendTime(txn, contactId)); assertEquals(now + MAX_LATENCY * 2,
db.getNextSendTime(txn, contactId, MAX_LATENCY));
// Time: now + MAX_LATENCY * 2 - 1 // Time: now + MAX_LATENCY * 2 - 1
// The message should not yet be sendable // The message should not yet be sendable
@@ -2158,7 +2173,8 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY); db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY);
// The message should expire after 2 * MAX_LATENCY // The message should expire after 2 * MAX_LATENCY
assertEquals(now + MAX_LATENCY * 2, db.getNextSendTime(txn, contactId)); assertEquals(now + MAX_LATENCY * 2,
db.getNextSendTime(txn, contactId, MAX_LATENCY));
// The message should not be sendable via the same transport // The message should not be sendable via the same transport
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
@@ -2205,7 +2221,8 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY); db.updateRetransmissionData(txn, contactId, messageId, MAX_LATENCY);
// The message should expire after 2 * MAX_LATENCY // The message should expire after 2 * MAX_LATENCY
assertEquals(now + MAX_LATENCY * 2, db.getNextSendTime(txn, contactId)); assertEquals(now + MAX_LATENCY * 2,
db.getNextSendTime(txn, contactId, MAX_LATENCY));
// Time: now + MAX_LATENCY * 2 - 1 // Time: now + MAX_LATENCY * 2 - 1
// The message should not yet be sendable // The message should not yet be sendable
@@ -2216,8 +2233,8 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// Reset the retransmission times // Reset the retransmission times
db.resetUnackedMessagesToSend(txn, contactId); db.resetUnackedMessagesToSend(txn, contactId);
// The message should have infinitely short expiry // The message should be sendable immediately
assertEquals(0, db.getNextSendTime(txn, contactId)); assertEquals(0, db.getNextSendTime(txn, contactId, MAX_LATENCY));
// The message should be sendable // The message should be sendable
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);