From 1a70200b65d045f44fb551558b956c69bf85efd4 Mon Sep 17 00:00:00 2001 From: goapunk Date: Thu, 23 Aug 2018 15:25:50 +0200 Subject: [PATCH] Allow retransmission if faster. * This commit introduces an estimated time of arrival (eta) to the message status which helps to decide whether a message should be retransmitted over a faster transport. --- .../briarproject/bramble/test/ArrayClock.java | 23 +++ .../org/briarproject/bramble/db/Database.java | 16 +-- .../bramble/db/DatabaseComponentImpl.java | 26 ++-- .../briarproject/bramble/db/JdbcDatabase.java | 43 +++--- .../bramble/db/Migration39_40.java | 54 +++++++ .../bramble/crypto/ScryptKdfTest.java | 22 +-- .../bramble/db/DatabaseComponentImplTest.java | 18 +-- .../bramble/db/DatabasePerformanceTest.java | 9 +- .../bramble/db/JdbcDatabaseTest.java | 136 ++++++++++++------ 9 files changed, 239 insertions(+), 108 deletions(-) create mode 100644 bramble-api/src/test/java/org/briarproject/bramble/test/ArrayClock.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/db/Migration39_40.java diff --git a/bramble-api/src/test/java/org/briarproject/bramble/test/ArrayClock.java b/bramble-api/src/test/java/org/briarproject/bramble/test/ArrayClock.java new file mode 100644 index 000000000..fc7c40e68 --- /dev/null +++ b/bramble-api/src/test/java/org/briarproject/bramble/test/ArrayClock.java @@ -0,0 +1,23 @@ +package org.briarproject.bramble.test; + +import org.briarproject.bramble.api.system.Clock; + +public class ArrayClock implements Clock { + + private final long[] times; + private int index = 0; + + public ArrayClock(long... times) { + this.times = times; + } + + @Override + public long currentTimeMillis() { + return times[index++]; + } + + @Override + public void sleep(long milliseconds) throws InterruptedException { + Thread.sleep(milliseconds); + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index d62bd6d38..ca958a7ed 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -421,7 +421,7 @@ interface Database { * Read-only. */ Collection getMessagesToOffer(T txn, ContactId c, - int maxMessages) throws DbException; + int maxMessages, int maxLatency) throws DbException; /** * Returns the IDs of some messages that are eligible to be requested from @@ -438,8 +438,8 @@ interface Database { *

* Read-only. */ - Collection getMessagesToSend(T txn, ContactId c, int maxLength) - throws DbException; + Collection getMessagesToSend(T txn, ContactId c, int maxLength, + int maxLatency) throws DbException; /** * Returns the IDs of any messages that need to be validated. @@ -482,7 +482,7 @@ interface Database { * Read-only. */ Collection getRequestedMessagesToSend(T txn, ContactId c, - int maxLength) throws DbException; + int maxLength, int maxLatency) throws DbException; /** * Returns all settings in the given namespace. @@ -647,11 +647,11 @@ interface Database { throws DbException; /** - * Updates the transmission count and expiry time of the given message - * with respect to the given contact, using the latency of the transport - * over which it was sent. + * Updates the transmission count, expiry time and estimated time of arrival + * of the given message with respect to the given contact, using the latency + * of the transport over which it was sent. */ - void updateExpiryTime(T txn, ContactId c, MessageId m, int maxLatency) + void updateExpiryTimeAndEta(T txn, ContactId c, MessageId m, int maxLatency) throws DbException; /** diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 06d295d42..99f9e7c10 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -313,11 +313,12 @@ class DatabaseComponentImpl implements DatabaseComponent { T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); - Collection ids = db.getMessagesToSend(txn, c, maxLength); + Collection ids = + db.getMessagesToSend(txn, c, maxLength, maxLatency); List messages = new ArrayList<>(ids.size()); for (MessageId m : ids) { messages.add(db.getMessage(txn, m)); - db.updateExpiryTime(txn, c, m, maxLatency); + db.updateExpiryTimeAndEta(txn, c, m, maxLatency); } if (ids.isEmpty()) return null; db.lowerRequestedFlag(txn, c, ids); @@ -333,9 +334,11 @@ class DatabaseComponentImpl implements DatabaseComponent { T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); - Collection ids = db.getMessagesToOffer(txn, c, maxMessages); + Collection ids = + db.getMessagesToOffer(txn, c, maxMessages, maxLatency); if (ids.isEmpty()) return null; - for (MessageId m : ids) db.updateExpiryTime(txn, c, m, maxLatency); + for (MessageId m : ids) + db.updateExpiryTimeAndEta(txn, c, m, maxLatency); return new Offer(ids); } @@ -362,12 +365,12 @@ class DatabaseComponentImpl implements DatabaseComponent { T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); - Collection ids = db.getRequestedMessagesToSend(txn, c, - maxLength); + Collection ids = + db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency); List messages = new ArrayList<>(ids.size()); for (MessageId m : ids) { messages.add(db.getMessage(txn, m)); - db.updateExpiryTime(txn, c, m, maxLatency); + db.updateExpiryTimeAndEta(txn, c, m, maxLatency); } if (ids.isEmpty()) return null; db.lowerRequestedFlag(txn, c, ids); @@ -855,7 +858,8 @@ class DatabaseComponentImpl implements DatabaseComponent { if (!db.containsMessage(txn, m)) throw new NoSuchMessageException(); if (db.getMessageState(txn, m) != DELIVERED) - throw new IllegalArgumentException("Shared undelivered message"); + throw new IllegalArgumentException( + "Shared undelivered message"); db.setMessageShared(txn, m); transaction.attach(new MessageSharedEvent(m)); } @@ -881,7 +885,8 @@ class DatabaseComponentImpl implements DatabaseComponent { throw new NoSuchMessageException(); State dependentState = db.getMessageState(txn, dependent.getId()); for (MessageId dependency : dependencies) { - db.addMessageDependency(txn, dependent, dependency, dependentState); + db.addMessageDependency(txn, dependent, dependency, + dependentState); } } @@ -913,7 +918,8 @@ class DatabaseComponentImpl implements DatabaseComponent { T txn = unbox(transaction); for (KeySet ks : keys) { TransportId t = ks.getTransportKeys().getTransportId(); - if (db.containsTransport(txn, t)) db.updateTransportKeys(txn, ks); + if (db.containsTransport(txn, t)) + db.updateTransportKeys(txn, ks); } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index 76eb96c47..eab7bffc5 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -38,6 +38,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -55,7 +56,6 @@ import java.util.logging.Logger; import javax.annotation.Nullable; import static java.sql.Types.INTEGER; -import static java.util.Collections.singletonList; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static org.briarproject.bramble.api.db.Metadata.REMOVE; @@ -79,7 +79,7 @@ import static org.briarproject.bramble.util.LogUtils.logException; abstract class JdbcDatabase implements Database { // Package access for testing - static final int CODE_SCHEMA_VERSION = 39; + static final int CODE_SCHEMA_VERSION = 40; // Rotation period offsets for incoming transport keys private static final int OFFSET_PREV = -1; @@ -219,6 +219,7 @@ abstract class JdbcDatabase implements Database { + " requested BOOLEAN NOT NULL," + " expiry BIGINT NOT NULL," + " txCount INT NOT NULL," + + " eta BIGINT NOT NULL," + " PRIMARY KEY (messageId, contactId)," + " FOREIGN KEY (messageId)" + " REFERENCES messages (messageId)" @@ -397,7 +398,7 @@ abstract class JdbcDatabase implements Database { // Package access for testing List> getMigrations() { - return singletonList(new Migration38_39()); + return Arrays.asList(new Migration38_39(), new Migration39_40()); } private void storeSchemaVersion(Connection txn, int version) @@ -805,8 +806,9 @@ abstract class JdbcDatabase implements Database { try { String sql = "INSERT INTO statuses (messageId, contactId, groupId," + " timestamp, length, state, groupShared, messageShared," - + " deleted, ack, seen, requested, expiry, txCount)" - + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, FALSE, 0, 0)"; + + " deleted, ack, seen, requested, expiry, txCount, eta)" + + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, FALSE, 0, 0," + + " 0)"; ps = txn.prepareStatement(sql); ps.setBytes(1, m.getBytes()); ps.setInt(2, c.getInt()); @@ -1869,8 +1871,9 @@ abstract class JdbcDatabase implements Database { @Override public Collection getMessagesToOffer(Connection txn, - ContactId c, int maxMessages) throws DbException { + ContactId c, int maxMessages, int maxLatency) throws DbException { long now = clock.currentTimeMillis(); + long eta = now + maxLatency * 2; PreparedStatement ps = null; ResultSet rs = null; try { @@ -1879,13 +1882,14 @@ abstract class JdbcDatabase implements Database { + " AND groupShared = TRUE AND messageShared = TRUE" + " AND deleted = FALSE" + " AND seen = FALSE AND requested = FALSE" - + " AND expiry < ?" + + " AND (expiry < ? OR eta > ?)" + " ORDER BY timestamp LIMIT ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); ps.setInt(2, DELIVERED.getValue()); ps.setLong(3, now); - ps.setInt(4, maxMessages); + ps.setLong(4, eta); + ps.setInt(5, maxMessages); rs = ps.executeQuery(); List ids = new ArrayList<>(); while (rs.next()) ids.add(new MessageId(rs.getBytes(1))); @@ -1926,8 +1930,9 @@ abstract class JdbcDatabase implements Database { @Override public Collection getMessagesToSend(Connection txn, ContactId c, - int maxLength) throws DbException { + int maxLength, int maxLatency) throws DbException { long now = clock.currentTimeMillis(); + long eta = now + maxLatency * 2; PreparedStatement ps = null; ResultSet rs = null; try { @@ -1936,12 +1941,13 @@ abstract class JdbcDatabase implements Database { + " AND groupShared = TRUE AND messageShared = TRUE" + " AND deleted = FALSE" + " AND seen = FALSE" - + " AND expiry < ?" + + " AND (expiry < ? OR eta > ?)" + " ORDER BY timestamp"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); ps.setInt(2, DELIVERED.getValue()); ps.setLong(3, now); + ps.setLong(4, eta); rs = ps.executeQuery(); List ids = new ArrayList<>(); int total = 0; @@ -2055,8 +2061,9 @@ abstract class JdbcDatabase implements Database { @Override public Collection getRequestedMessagesToSend(Connection txn, - ContactId c, int maxLength) throws DbException { + ContactId c, int maxLength, int maxLatency) throws DbException { long now = clock.currentTimeMillis(); + long eta = now + maxLatency * 2; PreparedStatement ps = null; ResultSet rs = null; try { @@ -2065,12 +2072,13 @@ abstract class JdbcDatabase implements Database { + " AND groupShared = TRUE AND messageShared = TRUE" + " AND deleted = FALSE" + " AND seen = FALSE AND requested = TRUE" - + " AND expiry < ?" + + " AND (expiry < ? OR eta > ?)" + " ORDER BY timestamp"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); ps.setInt(2, DELIVERED.getValue()); ps.setLong(3, now); + ps.setLong(4, eta); rs = ps.executeQuery(); List ids = new ArrayList<>(); int total = 0; @@ -2881,7 +2889,7 @@ abstract class JdbcDatabase implements Database { } @Override - public void updateExpiryTime(Connection txn, ContactId c, MessageId m, + public void updateExpiryTimeAndEta(Connection txn, ContactId c, MessageId m, int maxLatency) throws DbException { PreparedStatement ps = null; ResultSet rs = null; @@ -2897,13 +2905,16 @@ abstract class JdbcDatabase implements Database { if (rs.next()) throw new DbStateException(); rs.close(); ps.close(); - sql = "UPDATE statuses SET expiry = ?, txCount = txCount + 1" + sql = "UPDATE statuses" + + " SET expiry = ?, txCount = txCount + 1, eta = ?" + " WHERE messageId = ? AND contactId = ?"; ps = txn.prepareStatement(sql); long now = clock.currentTimeMillis(); + long eta = now + maxLatency * 2; ps.setLong(1, calculateExpiry(now, maxLatency, txCount)); - ps.setBytes(2, m.getBytes()); - ps.setInt(3, c.getInt()); + ps.setLong(2, eta); + ps.setBytes(3, m.getBytes()); + ps.setInt(4, c.getInt()); int affected = ps.executeUpdate(); if (affected != 1) throw new DbStateException(); ps.close(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Migration39_40.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Migration39_40.java new file mode 100644 index 000000000..596594b52 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Migration39_40.java @@ -0,0 +1,54 @@ +package org.briarproject.bramble.db; + +import org.briarproject.bramble.api.db.DbException; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.util.LogUtils.logException; + +class Migration39_40 implements Migration { + + private static final Logger LOG = + Logger.getLogger(Migration39_40.class.getName()); + + @Override + public int getStartVersion() { + return 39; + } + + @Override + public int getEndVersion() { + return 40; + } + + @Override + public void migrate(Connection txn) throws DbException { + Statement s = null; + try { + s = txn.createStatement(); + s.execute("ALTER TABLE statuses" + + " ADD eta BIGINT"); + s.execute("UPDATE statuses SET eta = 0"); + s.execute("ALTER TABLE statuses" + + " ALTER COLUMN eta" + + " SET NOT NULL"); + } catch (SQLException e) { + tryToClose(s); + throw new DbException(e); + } + } + + private void tryToClose(@Nullable Statement s) { + try { + if (s != null) s.close(); + } catch (SQLException e) { + logException(LOG, WARNING, e); + } + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/crypto/ScryptKdfTest.java b/bramble-core/src/test/java/org/briarproject/bramble/crypto/ScryptKdfTest.java index f7cbfe05e..b7356f00a 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/crypto/ScryptKdfTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/crypto/ScryptKdfTest.java @@ -11,6 +11,8 @@ import java.util.HashSet; import java.util.Set; import static junit.framework.TestCase.assertTrue; + +import org.briarproject.bramble.test.ArrayClock; import static org.briarproject.bramble.test.TestUtils.getRandomBytes; import static org.briarproject.bramble.util.StringUtils.getRandomString; import static org.junit.Assert.assertEquals; @@ -74,24 +76,4 @@ public class ScryptKdfTest extends BrambleTestCase { PasswordBasedKdf kdf = new ScryptKdf(clock); assertEquals(256, kdf.chooseCostParameter()); } - - private static class ArrayClock implements Clock { - - private final long[] times; - private int index = 0; - - private ArrayClock(long... times) { - this.times = times; - } - - @Override - public long currentTimeMillis() { - return times[index++]; - } - - @Override - public void sleep(long milliseconds) throws InterruptedException { - Thread.sleep(milliseconds); - } - } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java index ed3780026..5096b91ec 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java @@ -871,15 +871,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { oneOf(database).containsContact(txn, contactId); will(returnValue(true)); oneOf(database).getMessagesToSend(txn, contactId, - MAX_MESSAGE_LENGTH * 2); + MAX_MESSAGE_LENGTH * 2, maxLatency); will(returnValue(ids)); oneOf(database).getMessage(txn, messageId); will(returnValue(message)); - oneOf(database).updateExpiryTime(txn, contactId, messageId, + oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId, maxLatency); oneOf(database).getMessage(txn, messageId1); will(returnValue(message1)); - oneOf(database).updateExpiryTime(txn, contactId, messageId1, + oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1, maxLatency); oneOf(database).lowerRequestedFlag(txn, contactId, ids); oneOf(database).commitTransaction(txn); @@ -907,11 +907,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { will(returnValue(txn)); oneOf(database).containsContact(txn, contactId); will(returnValue(true)); - oneOf(database).getMessagesToOffer(txn, contactId, 123); + oneOf(database).getMessagesToOffer(txn, contactId, 123, maxLatency); will(returnValue(ids)); - oneOf(database).updateExpiryTime(txn, contactId, messageId, + oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId, maxLatency); - oneOf(database).updateExpiryTime(txn, contactId, messageId1, + oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1, maxLatency); oneOf(database).commitTransaction(txn); }}); @@ -967,15 +967,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { oneOf(database).containsContact(txn, contactId); will(returnValue(true)); oneOf(database).getRequestedMessagesToSend(txn, contactId, - MAX_MESSAGE_LENGTH * 2); + MAX_MESSAGE_LENGTH * 2, maxLatency); will(returnValue(ids)); oneOf(database).getMessage(txn, messageId); will(returnValue(message)); - oneOf(database).updateExpiryTime(txn, contactId, messageId, + oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId, maxLatency); oneOf(database).getMessage(txn, messageId1); will(returnValue(message1)); - oneOf(database).updateExpiryTime(txn, contactId, messageId1, + oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1, maxLatency); oneOf(database).lowerRequestedFlag(txn, contactId, ids); oneOf(database).commitTransaction(txn); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabasePerformanceTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabasePerformanceTest.java index f20454bb6..55e5e5ade 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabasePerformanceTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabasePerformanceTest.java @@ -96,6 +96,9 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase { */ private static final int STEADY_STATE_BLOCKS = 5; + // All our transports use a maximum latency of 30 seconds + private static final int MAX_LATENCY = 30 * 1000; + protected final File testDir = getTestDirectory(); private final File resultsFile = new File(getTestName() + ".tsv"); protected final Random random = new Random(); @@ -448,7 +451,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase { benchmark(name, db -> { Connection txn = db.startTransaction(); db.getMessagesToOffer(txn, pickRandom(contacts).getId(), - MAX_MESSAGE_IDS); + MAX_MESSAGE_IDS, MAX_LATENCY); db.commitTransaction(txn); }); } @@ -470,7 +473,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase { benchmark(name, db -> { Connection txn = db.startTransaction(); db.getMessagesToSend(txn, pickRandom(contacts).getId(), - MAX_MESSAGE_IDS); + MAX_MESSAGE_IDS, MAX_LATENCY); db.commitTransaction(txn); }); } @@ -521,7 +524,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase { benchmark(name, db -> { Connection txn = db.startTransaction(); db.getRequestedMessagesToSend(txn, pickRandom(contacts).getId(), - MAX_MESSAGE_IDS); + MAX_MESSAGE_IDS, MAX_LATENCY); db.commitTransaction(txn); }); } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java index 6e434f90c..55b908b06 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java @@ -26,6 +26,7 @@ import org.briarproject.bramble.api.transport.KeySetId; import org.briarproject.bramble.api.transport.OutgoingKeys; import org.briarproject.bramble.api.transport.TransportKeys; import org.briarproject.bramble.system.SystemClock; +import org.briarproject.bramble.test.ArrayClock; import org.briarproject.bramble.test.BrambleTestCase; import org.briarproject.bramble.test.TestDatabaseConfig; import org.briarproject.bramble.test.TestMessageFactory; @@ -60,6 +61,7 @@ import static org.briarproject.bramble.api.sync.ValidationManager.State.DELIVERE import static org.briarproject.bramble.api.sync.ValidationManager.State.INVALID; import static org.briarproject.bramble.api.sync.ValidationManager.State.PENDING; import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN; +import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory; import static org.briarproject.bramble.test.TestUtils.getAuthor; import static org.briarproject.bramble.test.TestUtils.getClientId; import static org.briarproject.bramble.test.TestUtils.getGroup; @@ -82,6 +84,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { private static final int ONE_MEGABYTE = 1024 * 1024; private static final int MAX_SIZE = 5 * ONE_MEGABYTE; + // All our transports use a maximum latency of 30 seconds + private static final int MAX_LATENCY = 30 * 1000; + private final SecretKey key = getSecretKey(); private final File testDir = getTestDirectory(); @@ -199,16 +204,16 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // The contact has not seen the message, so it should be sendable Collection ids = - db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertEquals(singletonList(messageId), ids); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); // Changing the status to seen = true should make the message unsendable db.raiseSeenFlag(txn, contactId, messageId); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); db.commitTransaction(txn); @@ -230,30 +235,30 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // The message has not been validated, so it should not be sendable Collection ids = db.getMessagesToSend(txn, contactId, - ONE_MEGABYTE); + ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); // Marking the message delivered should make it sendable db.setMessageState(txn, messageId, DELIVERED); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertEquals(singletonList(messageId), ids); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); // Marking the message invalid should make it unsendable db.setMessageState(txn, messageId, INVALID); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); // Marking the message pending should make it unsendable db.setMessageState(txn, messageId, PENDING); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); db.commitTransaction(txn); @@ -274,37 +279,37 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // The group is invisible, so the message should not be sendable Collection ids = db.getMessagesToSend(txn, contactId, - ONE_MEGABYTE); + ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); // Making the group visible should not make the message sendable db.addGroupVisibility(txn, contactId, groupId, false); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); // Sharing the group should make the message sendable db.setGroupVisibility(txn, contactId, groupId, true); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertEquals(singletonList(messageId), ids); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); // Unsharing the group should make the message unsendable db.setGroupVisibility(txn, contactId, groupId, false); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); // Making the group invisible should make the message unsendable db.removeGroupVisibility(txn, contactId, groupId); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); db.commitTransaction(txn); @@ -326,16 +331,16 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // The message is not shared, so it should not be sendable Collection ids = db.getMessagesToSend(txn, contactId, - ONE_MEGABYTE); + ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); // Sharing the message should make it sendable db.setMessageShared(txn, messageId); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertEquals(singletonList(messageId), ids); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); db.commitTransaction(txn); @@ -356,12 +361,13 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { db.addMessage(txn, message, DELIVERED, true, null); // The message is sendable, but too large to send - Collection ids = db.getMessagesToSend(txn, contactId, - message.getRawLength() - 1); + Collection ids = + db.getMessagesToSend(txn, contactId, message.getRawLength() - 1, + MAX_LATENCY); assertTrue(ids.isEmpty()); - // The message is just the right size to send - ids = db.getMessagesToSend(txn, contactId, message.getRawLength()); + ids = db.getMessagesToSend(txn, contactId, message.getRawLength(), + MAX_LATENCY); assertEquals(singletonList(messageId), ids); db.commitTransaction(txn); @@ -424,19 +430,19 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // Retrieve the message from the database and mark it as sent Collection ids = db.getMessagesToSend(txn, contactId, - ONE_MEGABYTE); + ONE_MEGABYTE, MAX_LATENCY); assertEquals(singletonList(messageId), ids); - db.updateExpiryTime(txn, contactId, messageId, Integer.MAX_VALUE); + db.updateExpiryTimeAndEta(txn, contactId, messageId, Integer.MAX_VALUE); // The message should no longer be sendable - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); // Pretend that the message was acked db.raiseSeenFlag(txn, contactId, messageId); // The message still should not be sendable - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); db.commitTransaction(txn); @@ -1517,7 +1523,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertFalse(status.isSeen()); // Pretend the message was sent to the contact - db.updateExpiryTime(txn, contactId, messageId, Integer.MAX_VALUE); + db.updateExpiryTimeAndEta(txn, contactId, messageId, Integer.MAX_VALUE); // The message should be sent but not seen status = db.getMessageStatus(txn, contactId, messageId); @@ -1636,9 +1642,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // The message should be sendable Collection ids = db.getMessagesToSend(txn, contactId, - ONE_MEGABYTE); + ONE_MEGABYTE, MAX_LATENCY); assertEquals(singletonList(messageId), ids); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); // The message should be available @@ -1655,9 +1661,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(db.containsVisibleMessage(txn, contactId, messageId)); // The message should not be sendable - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); + ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); // Requesting the message should throw an exception @@ -1761,12 +1767,12 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // Update the message's expiry time as though we sent it - now the // message should be sendable after one round-trip - db.updateExpiryTime(txn, contactId, messageId, 1000); + db.updateExpiryTimeAndEta(txn, contactId, messageId, 1000); assertEquals(now + 2000, db.getNextSendTime(txn, contactId)); // Update the message's expiry time again - now it should be sendable // after two round-trips - db.updateExpiryTime(txn, contactId, messageId, 1000); + db.updateExpiryTimeAndEta(txn, contactId, messageId, 1000); assertEquals(now + 4000, db.getNextSendTime(txn, contactId)); // Delete the message - there should be no messages to send @@ -1809,6 +1815,52 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { db.close(); } + @Test + public void testFasterMessageRetransmission() throws Exception { + long now = System.currentTimeMillis(); + long steps[] = {now, now, now + MAX_LATENCY, now + MAX_LATENCY, + now + 1 + MAX_LATENCY * 2}; + Database db = open(false, new ArrayClock(steps)); + Connection txn = db.startTransaction(); + + // Add a contact, a shared group and a shared message + db.addLocalAuthor(txn, localAuthor); + assertEquals(contactId, db.addContact(txn, author, localAuthor.getId(), + true, true)); + db.addGroup(txn, group); + db.addGroupVisibility(txn, contactId, groupId, true); + db.addMessage(txn, message, DELIVERED, true, null); + + // Time: now + // Retrieve the message from the database and mark it as sent + Collection ids = db.getMessagesToSend(txn, contactId, + ONE_MEGABYTE, MAX_LATENCY); + assertEquals(singletonList(messageId), ids); + // Time: now + db.updateExpiryTimeAndEta(txn, contactId, messageId, MAX_LATENCY); + + // Time: now + MAX_LATENCY + // The message should no longer be sendable via transports with + // with an equal or higher ETA + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); + assertTrue(ids.isEmpty()); + + // Time: now + MAX_LATENCY + // The message should be sendable via a transport with a faster ETA + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, + MAX_LATENCY / 2 - 1); + assertEquals(singletonList(messageId), ids); + + // Time: now + 1 + MAX_LATENCY * 2 + // The message expired and should be sendable by every transport. + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); + assertEquals(singletonList(messageId), ids); + + db.commitTransaction(txn); + db.close(); + } + + private Database open(boolean resume) throws Exception { return open(resume, new TestMessageFactory(), new SystemClock()); } @@ -1846,7 +1898,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { @After public void tearDown() { - TestUtils.deleteTestDirectory(testDir); + deleteTestDirectory(testDir); } private static class StoppedClock implements Clock {