diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
index ff6ef3885..a76138c1f 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
@@ -378,6 +378,16 @@ public interface DatabaseComponent {
MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m)
throws DbException;
+ /*
+ * Returns the next time (in milliseconds since the Unix epoch) when a
+ * message is due to be sent to the given contact. The returned value may
+ * be zero if a message is due to be sent immediately, or Long.MAX_VALUE if
+ * no messages are scheduled to be sent.
+ *
+ * Read-only.
+ */
+ long getNextSendTime(Transaction txn, ContactId c) throws DbException;
+
/**
* Returns all settings in the given namespace.
*
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java
index 71b351452..b2c60caba 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java
@@ -456,6 +456,16 @@ interface Database {
Collection getMessagesToShare(T txn, ClientId c)
throws DbException;
+ /**
+ * Returns the next time (in milliseconds since the Unix epoch) when a
+ * message is due to be sent to the given contact. The returned value may
+ * be zero if a message is due to be sent immediately, or Long.MAX_VALUE
+ * if no messages are scheduled to be sent.
+ *
+ * Read-only.
+ */
+ long getNextSendTime(T txn, ContactId c) throws DbException;
+
/**
* Returns the message with the given ID, in serialised form, or null if
* the message has been deleted.
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
index 570ec013a..75149a1a9 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
@@ -579,6 +579,13 @@ class DatabaseComponentImpl implements DatabaseComponent {
return db.getMessageDependents(txn, m);
}
+ @Override
+ public long getNextSendTime(Transaction transaction, ContactId c)
+ throws DbException {
+ T txn = unbox(transaction);
+ return db.getNextSendTime(txn, c);
+ }
+
@Override
public Settings getSettings(Transaction transaction, String namespace)
throws DbException {
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java
index 29e91ce24..627912657 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java
@@ -1844,6 +1844,41 @@ abstract class JdbcDatabase implements Database {
}
}
+ @Override
+ public long getNextSendTime(Connection txn, ContactId c)
+ throws DbException {
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = "SELECT expiry FROM messages AS m"
+ + " JOIN groupVisibilities AS gv"
+ + " ON m.groupId = gv.groupId"
+ + " JOIN statuses AS s"
+ + " ON m.messageId = s.messageId"
+ + " AND gv.contactId = s.contactId"
+ + " WHERE gv.contactId = ? AND gv.shared = TRUE"
+ + " AND state = ? AND m.shared = TRUE AND raw IS NOT NULL"
+ + " AND seen = FALSE"
+ + " ORDER BY expiry LIMIT 1";
+ ps = txn.prepareStatement(sql);
+ ps.setInt(1, c.getInt());
+ ps.setInt(2, DELIVERED.getValue());
+ rs = ps.executeQuery();
+ long nextSendTime = Long.MAX_VALUE;
+ if (rs.next()) {
+ nextSendTime = rs.getLong(1);
+ if (rs.next()) throw new AssertionError();
+ }
+ rs.close();
+ ps.close();
+ return nextSendTime;
+ } catch (SQLException e) {
+ tryToClose(rs);
+ tryToClose(ps);
+ throw new DbException(e);
+ }
+ }
+
@Override
@Nullable
public byte[] getRawMessage(Connection txn, MessageId m)
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
index f5e892405..3fdabc01d 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
@@ -30,6 +30,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
@@ -50,13 +51,14 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD
@NotNullByDefault
class DuplexOutgoingSession implements SyncSession, EventListener {
- // Check for retransmittable records once every 60 seconds
- private static final int RETX_QUERY_INTERVAL = 60 * 1000;
private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName());
private static final ThrowingRunnable CLOSE = () -> {
};
+ private static final ThrowingRunnable
+ NEXT_SEND_TIME_DECREASED = () -> {
+ };
private final DatabaseComponent db;
private final Executor dbExecutor;
@@ -72,6 +74,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false);
private final AtomicBoolean generateRequestQueued =
new AtomicBoolean(false);
+ private final AtomicLong nextSendTime = new AtomicLong(Long.MAX_VALUE);
private volatile boolean interrupted = false;
@@ -101,15 +104,15 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
generateRequest();
long now = clock.currentTimeMillis();
long nextKeepalive = now + maxIdleTime;
- long nextRetxQuery = now + RETX_QUERY_INTERVAL;
boolean dataToFlush = true;
// Write records until interrupted
try {
while (!interrupted) {
// Work out how long we should wait for a record
now = clock.currentTimeMillis();
- long wait = Math.min(nextKeepalive, nextRetxQuery) - now;
- if (wait < 0) wait = 0;
+ long keepaliveWait = Math.max(0, nextKeepalive - now);
+ long sendWait = Math.max(0, nextSendTime.get() - now);
+ long wait = Math.min(keepaliveWait, sendWait);
// Flush any unflushed data if we're going to wait
if (wait > 0 && dataToFlush && writerTasks.isEmpty()) {
recordWriter.flush();
@@ -121,20 +124,25 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
MILLISECONDS);
if (task == null) {
now = clock.currentTimeMillis();
- if (now >= nextRetxQuery) {
- // Check for retransmittable records
+ if (now >= nextSendTime.get()) {
+ // Check for retransmittable messages
+ LOG.info("Checking for retransmittable messages");
+ setNextSendTime(Long.MAX_VALUE);
generateBatch();
generateOffer();
- nextRetxQuery = now + RETX_QUERY_INTERVAL;
}
if (now >= nextKeepalive) {
// Flush the stream to keep it alive
+ LOG.info("Sending keepalive");
recordWriter.flush();
dataToFlush = false;
nextKeepalive = now + maxIdleTime;
}
} else if (task == CLOSE) {
+ LOG.info("Closed");
break;
+ } else if (task == NEXT_SEND_TIME_DECREASED) {
+ LOG.info("Next send time decreased");
} else {
task.run();
dataToFlush = true;
@@ -170,6 +178,11 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateRequest());
}
+ private void setNextSendTime(long time) {
+ long old = nextSendTime.getAndSet(time);
+ if (time < old) writerTasks.add(NEXT_SEND_TIME_DECREASED);
+ }
+
@Override
public void interrupt() {
interrupted = true;
@@ -259,6 +272,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
try {
b = db.generateRequestedBatch(txn, contactId,
MAX_RECORD_PAYLOAD_LENGTH, maxLatency);
+ setNextSendTime(db.getNextSendTime(txn, contactId));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
@@ -305,6 +319,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
try {
o = db.generateOffer(txn, contactId, MAX_MESSAGE_IDS,
maxLatency);
+ setNextSendTime(db.getNextSendTime(txn, contactId));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/H2DatabaseTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/H2DatabaseTest.java
index 4afea4bf4..89738a408 100644
--- a/bramble-core/src/test/java/org/briarproject/bramble/db/H2DatabaseTest.java
+++ b/bramble-core/src/test/java/org/briarproject/bramble/db/H2DatabaseTest.java
@@ -17,6 +17,7 @@ import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.MessageStatus;
import org.briarproject.bramble.api.sync.ValidationManager.State;
+import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.transport.IncomingKeys;
import org.briarproject.bramble.api.transport.OutgoingKeys;
import org.briarproject.bramble.api.transport.TransportKeys;
@@ -1635,6 +1636,53 @@ public class H2DatabaseTest extends BrambleTestCase {
db.close();
}
+ @Test
+ public void testGetNextSendTime() throws Exception {
+ long now = System.currentTimeMillis();
+ Database db = open(false, new StoppedClock(now));
+ Connection txn = db.startTransaction();
+
+ // Add a contact, a group and a message
+ db.addLocalAuthor(txn, localAuthor);
+ assertEquals(contactId, db.addContact(txn, author, localAuthor.getId(),
+ true, true));
+ db.addGroup(txn, group);
+ db.addMessage(txn, message, UNKNOWN, false);
+
+ // There should be no messages to send
+ assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId));
+
+ // Share the group with the contact - still no messages to send
+ db.addGroupVisibility(txn, contactId, groupId, true);
+ db.addStatus(txn, contactId, messageId, false, false);
+ assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId));
+
+ // Set the message's state to DELIVERED - still no messages to send
+ db.setMessageState(txn, messageId, DELIVERED);
+ assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId));
+
+ // Share the message - now it should be sendable immediately
+ db.setMessageShared(txn, messageId);
+ assertEquals(0, db.getNextSendTime(txn, contactId));
+
+ // Update the message's expiry time as though we sent it - now the
+ // message should be sendable after one round-trip
+ db.updateExpiryTime(txn, contactId, messageId, 1000);
+ assertEquals(now + 2000, db.getNextSendTime(txn, contactId));
+
+ // Update the message's expiry time again - now it should be sendable
+ // after two round-trips
+ db.updateExpiryTime(txn, contactId, messageId, 1000);
+ assertEquals(now + 4000, db.getNextSendTime(txn, contactId));
+
+ // Delete the message - there should be no messages to send
+ db.deleteMessage(txn, messageId);
+ assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId));
+
+ db.commitTransaction(txn);
+ db.close();
+ }
+
@Test
public void testExceptionHandling() throws Exception {
Database db = open(false);
@@ -1652,8 +1700,13 @@ public class H2DatabaseTest extends BrambleTestCase {
}
private Database open(boolean resume) throws Exception {
+ return open(resume, new SystemClock());
+ }
+
+ private Database open(boolean resume, Clock clock)
+ throws Exception {
Database db = new H2Database(new TestDatabaseConfig(testDir,
- MAX_SIZE), new SystemClock());
+ MAX_SIZE), clock);
if (!resume) TestUtils.deleteTestDirectory(testDir);
db.open();
return db;
@@ -1683,4 +1736,23 @@ public class H2DatabaseTest extends BrambleTestCase {
public void tearDown() {
TestUtils.deleteTestDirectory(testDir);
}
+
+ private static class StoppedClock implements Clock {
+
+ private final long time;
+
+ private StoppedClock(long time) {
+ this.time = time;
+ }
+
+ @Override
+ public long currentTimeMillis() {
+ return time;
+ }
+
+ @Override
+ public void sleep(long milliseconds) throws InterruptedException {
+ Thread.sleep(milliseconds);
+ }
+ }
}