Merge branch '541-faster-retransmission-eta' into 'master'

Allow retransmission if it will result in faster delivery

Closes #541

See merge request briar/briar!908
This commit is contained in:
akwizgran
2018-09-18 14:26:22 +00:00
9 changed files with 290 additions and 108 deletions

View File

@@ -0,0 +1,23 @@
package org.briarproject.bramble.test;
import org.briarproject.bramble.api.system.Clock;
public class ArrayClock implements Clock {
private final long[] times;
private int index = 0;
public ArrayClock(long... times) {
this.times = times;
}
@Override
public long currentTimeMillis() {
return times[index++];
}
@Override
public void sleep(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds);
}
}

View File

@@ -421,7 +421,7 @@ interface Database<T> {
* Read-only. * Read-only.
*/ */
Collection<MessageId> getMessagesToOffer(T txn, ContactId c, Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
int maxMessages) throws DbException; int maxMessages, int maxLatency) throws DbException;
/** /**
* Returns the IDs of some messages that are eligible to be requested from * Returns the IDs of some messages that are eligible to be requested from
@@ -438,8 +438,8 @@ interface Database<T> {
* <p/> * <p/>
* Read-only. * Read-only.
*/ */
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength) Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength,
throws DbException; int maxLatency) throws DbException;
/** /**
* Returns the IDs of any messages that need to be validated. * Returns the IDs of any messages that need to be validated.
@@ -482,7 +482,7 @@ interface Database<T> {
* Read-only. * Read-only.
*/ */
Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c, Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c,
int maxLength) throws DbException; int maxLength, int maxLatency) throws DbException;
/** /**
* Returns all settings in the given namespace. * Returns all settings in the given namespace.
@@ -647,11 +647,11 @@ interface Database<T> {
throws DbException; throws DbException;
/** /**
* Updates the transmission count and expiry time of the given message * Updates the transmission count, expiry time and estimated time of arrival
* with respect to the given contact, using the latency of the transport * of the given message with respect to the given contact, using the latency
* over which it was sent. * of the transport over which it was sent.
*/ */
void updateExpiryTime(T txn, ContactId c, MessageId m, int maxLatency) void updateExpiryTimeAndEta(T txn, ContactId c, MessageId m, int maxLatency)
throws DbException; throws DbException;
/** /**

View File

@@ -313,11 +313,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction); T txn = unbox(transaction);
if (!db.containsContact(txn, c)) if (!db.containsContact(txn, c))
throw new NoSuchContactException(); throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToSend(txn, c, maxLength); Collection<MessageId> ids =
db.getMessagesToSend(txn, c, maxLength, maxLatency);
List<Message> messages = new ArrayList<>(ids.size()); List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) { for (MessageId m : ids) {
messages.add(db.getMessage(txn, m)); messages.add(db.getMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency); db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
} }
if (ids.isEmpty()) return null; if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids); db.lowerRequestedFlag(txn, c, ids);
@@ -333,9 +334,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction); T txn = unbox(transaction);
if (!db.containsContact(txn, c)) if (!db.containsContact(txn, c))
throw new NoSuchContactException(); throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToOffer(txn, c, maxMessages); Collection<MessageId> ids =
db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
if (ids.isEmpty()) return null; if (ids.isEmpty()) return null;
for (MessageId m : ids) db.updateExpiryTime(txn, c, m, maxLatency); for (MessageId m : ids)
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
return new Offer(ids); return new Offer(ids);
} }
@@ -362,12 +365,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction); T txn = unbox(transaction);
if (!db.containsContact(txn, c)) if (!db.containsContact(txn, c))
throw new NoSuchContactException(); throw new NoSuchContactException();
Collection<MessageId> ids = db.getRequestedMessagesToSend(txn, c, Collection<MessageId> ids =
maxLength); db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency);
List<Message> messages = new ArrayList<>(ids.size()); List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) { for (MessageId m : ids) {
messages.add(db.getMessage(txn, m)); messages.add(db.getMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency); db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
} }
if (ids.isEmpty()) return null; if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids); db.lowerRequestedFlag(txn, c, ids);
@@ -855,7 +858,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsMessage(txn, m)) if (!db.containsMessage(txn, m))
throw new NoSuchMessageException(); throw new NoSuchMessageException();
if (db.getMessageState(txn, m) != DELIVERED) if (db.getMessageState(txn, m) != DELIVERED)
throw new IllegalArgumentException("Shared undelivered message"); throw new IllegalArgumentException(
"Shared undelivered message");
db.setMessageShared(txn, m); db.setMessageShared(txn, m);
transaction.attach(new MessageSharedEvent(m)); transaction.attach(new MessageSharedEvent(m));
} }
@@ -881,7 +885,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchMessageException(); throw new NoSuchMessageException();
State dependentState = db.getMessageState(txn, dependent.getId()); State dependentState = db.getMessageState(txn, dependent.getId());
for (MessageId dependency : dependencies) { for (MessageId dependency : dependencies) {
db.addMessageDependency(txn, dependent, dependency, dependentState); db.addMessageDependency(txn, dependent, dependency,
dependentState);
} }
} }
@@ -913,7 +918,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction); T txn = unbox(transaction);
for (KeySet ks : keys) { for (KeySet ks : keys) {
TransportId t = ks.getTransportKeys().getTransportId(); TransportId t = ks.getTransportKeys().getTransportId();
if (db.containsTransport(txn, t)) db.updateTransportKeys(txn, ks); if (db.containsTransport(txn, t))
db.updateTransportKeys(txn, ks);
} }
} }
} }

View File

@@ -38,6 +38,7 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@@ -55,7 +56,6 @@ import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import static java.sql.Types.INTEGER; import static java.sql.Types.INTEGER;
import static java.util.Collections.singletonList;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.db.Metadata.REMOVE; import static org.briarproject.bramble.api.db.Metadata.REMOVE;
@@ -79,7 +79,7 @@ import static org.briarproject.bramble.util.LogUtils.logException;
abstract class JdbcDatabase implements Database<Connection> { abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing // Package access for testing
static final int CODE_SCHEMA_VERSION = 39; static final int CODE_SCHEMA_VERSION = 40;
// Rotation period offsets for incoming transport keys // Rotation period offsets for incoming transport keys
private static final int OFFSET_PREV = -1; private static final int OFFSET_PREV = -1;
@@ -219,6 +219,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " requested BOOLEAN NOT NULL," + " requested BOOLEAN NOT NULL,"
+ " expiry BIGINT NOT NULL," + " expiry BIGINT NOT NULL,"
+ " txCount INT NOT NULL," + " txCount INT NOT NULL,"
+ " eta BIGINT NOT NULL,"
+ " PRIMARY KEY (messageId, contactId)," + " PRIMARY KEY (messageId, contactId),"
+ " FOREIGN KEY (messageId)" + " FOREIGN KEY (messageId)"
+ " REFERENCES messages (messageId)" + " REFERENCES messages (messageId)"
@@ -397,7 +398,7 @@ abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing // Package access for testing
List<Migration<Connection>> getMigrations() { List<Migration<Connection>> getMigrations() {
return singletonList(new Migration38_39()); return Arrays.asList(new Migration38_39(), new Migration39_40());
} }
private void storeSchemaVersion(Connection txn, int version) private void storeSchemaVersion(Connection txn, int version)
@@ -805,8 +806,9 @@ abstract class JdbcDatabase implements Database<Connection> {
try { try {
String sql = "INSERT INTO statuses (messageId, contactId, groupId," String sql = "INSERT INTO statuses (messageId, contactId, groupId,"
+ " timestamp, length, state, groupShared, messageShared," + " timestamp, length, state, groupShared, messageShared,"
+ " deleted, ack, seen, requested, expiry, txCount)" + " deleted, ack, seen, requested, expiry, txCount, eta)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, FALSE, 0, 0)"; + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, FALSE, 0, 0,"
+ " 0)";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes()); ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt()); ps.setInt(2, c.getInt());
@@ -1869,8 +1871,9 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override @Override
public Collection<MessageId> getMessagesToOffer(Connection txn, public Collection<MessageId> getMessagesToOffer(Connection txn,
ContactId c, int maxMessages) throws DbException { ContactId c, int maxMessages, int maxLatency) throws DbException {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
long eta = now + maxLatency;
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
@@ -1879,13 +1882,14 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND groupShared = TRUE AND messageShared = TRUE" + " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE" + " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = FALSE" + " AND seen = FALSE AND requested = FALSE"
+ " AND expiry < ?" + " AND (expiry < ? OR eta > ?)"
+ " ORDER BY timestamp LIMIT ?"; + " ORDER BY timestamp LIMIT ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt()); ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue()); ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now); ps.setLong(3, now);
ps.setInt(4, maxMessages); ps.setLong(4, eta);
ps.setInt(5, maxMessages);
rs = ps.executeQuery(); rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>(); List<MessageId> ids = new ArrayList<>();
while (rs.next()) ids.add(new MessageId(rs.getBytes(1))); while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
@@ -1926,8 +1930,9 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override @Override
public Collection<MessageId> getMessagesToSend(Connection txn, ContactId c, public Collection<MessageId> getMessagesToSend(Connection txn, ContactId c,
int maxLength) throws DbException { int maxLength, int maxLatency) throws DbException {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
long eta = now + maxLatency;
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
@@ -1936,12 +1941,13 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND groupShared = TRUE AND messageShared = TRUE" + " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE" + " AND deleted = FALSE"
+ " AND seen = FALSE" + " AND seen = FALSE"
+ " AND expiry < ?" + " AND (expiry < ? OR eta > ?)"
+ " ORDER BY timestamp"; + " ORDER BY timestamp";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt()); ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue()); ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now); ps.setLong(3, now);
ps.setLong(4, eta);
rs = ps.executeQuery(); rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>(); List<MessageId> ids = new ArrayList<>();
int total = 0; int total = 0;
@@ -2055,8 +2061,9 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override @Override
public Collection<MessageId> getRequestedMessagesToSend(Connection txn, public Collection<MessageId> getRequestedMessagesToSend(Connection txn,
ContactId c, int maxLength) throws DbException { ContactId c, int maxLength, int maxLatency) throws DbException {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
long eta = now + maxLatency;
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
@@ -2065,12 +2072,13 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND groupShared = TRUE AND messageShared = TRUE" + " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE" + " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = TRUE" + " AND seen = FALSE AND requested = TRUE"
+ " AND expiry < ?" + " AND (expiry < ? OR eta > ?)"
+ " ORDER BY timestamp"; + " ORDER BY timestamp";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt()); ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue()); ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now); ps.setLong(3, now);
ps.setLong(4, eta);
rs = ps.executeQuery(); rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>(); List<MessageId> ids = new ArrayList<>();
int total = 0; int total = 0;
@@ -2881,7 +2889,7 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
@Override @Override
public void updateExpiryTime(Connection txn, ContactId c, MessageId m, public void updateExpiryTimeAndEta(Connection txn, ContactId c, MessageId m,
int maxLatency) throws DbException { int maxLatency) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
@@ -2897,13 +2905,16 @@ abstract class JdbcDatabase implements Database<Connection> {
if (rs.next()) throw new DbStateException(); if (rs.next()) throw new DbStateException();
rs.close(); rs.close();
ps.close(); ps.close();
sql = "UPDATE statuses SET expiry = ?, txCount = txCount + 1" sql = "UPDATE statuses"
+ " SET expiry = ?, txCount = txCount + 1, eta = ?"
+ " WHERE messageId = ? AND contactId = ?"; + " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
long eta = now + maxLatency;
ps.setLong(1, calculateExpiry(now, maxLatency, txCount)); ps.setLong(1, calculateExpiry(now, maxLatency, txCount));
ps.setBytes(2, m.getBytes()); ps.setLong(2, eta);
ps.setInt(3, c.getInt()); ps.setBytes(3, m.getBytes());
ps.setInt(4, c.getInt());
int affected = ps.executeUpdate(); int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException(); if (affected != 1) throw new DbStateException();
ps.close(); ps.close();

View File

@@ -0,0 +1,54 @@
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.util.LogUtils.logException;
class Migration39_40 implements Migration<Connection> {
private static final Logger LOG =
Logger.getLogger(Migration39_40.class.getName());
@Override
public int getStartVersion() {
return 39;
}
@Override
public int getEndVersion() {
return 40;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
try {
s = txn.createStatement();
s.execute("ALTER TABLE statuses"
+ " ADD eta BIGINT");
s.execute("UPDATE statuses SET eta = 0");
s.execute("ALTER TABLE statuses"
+ " ALTER COLUMN eta"
+ " SET NOT NULL");
} catch (SQLException e) {
tryToClose(s);
throw new DbException(e);
}
}
private void tryToClose(@Nullable Statement s) {
try {
if (s != null) s.close();
} catch (SQLException e) {
logException(LOG, WARNING, e);
}
}
}

View File

@@ -11,6 +11,8 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.assertTrue;
import org.briarproject.bramble.test.ArrayClock;
import static org.briarproject.bramble.test.TestUtils.getRandomBytes; import static org.briarproject.bramble.test.TestUtils.getRandomBytes;
import static org.briarproject.bramble.util.StringUtils.getRandomString; import static org.briarproject.bramble.util.StringUtils.getRandomString;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@@ -74,24 +76,4 @@ public class ScryptKdfTest extends BrambleTestCase {
PasswordBasedKdf kdf = new ScryptKdf(clock); PasswordBasedKdf kdf = new ScryptKdf(clock);
assertEquals(256, kdf.chooseCostParameter()); assertEquals(256, kdf.chooseCostParameter());
} }
private static class ArrayClock implements Clock {
private final long[] times;
private int index = 0;
private ArrayClock(long... times) {
this.times = times;
}
@Override
public long currentTimeMillis() {
return times[index++];
}
@Override
public void sleep(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds);
}
}
} }

View File

@@ -871,15 +871,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
oneOf(database).getMessagesToSend(txn, contactId, oneOf(database).getMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2); MAX_MESSAGE_LENGTH * 2, maxLatency);
will(returnValue(ids)); will(returnValue(ids));
oneOf(database).getMessage(txn, messageId); oneOf(database).getMessage(txn, messageId);
will(returnValue(message)); will(returnValue(message));
oneOf(database).updateExpiryTime(txn, contactId, messageId, oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
maxLatency); maxLatency);
oneOf(database).getMessage(txn, messageId1); oneOf(database).getMessage(txn, messageId1);
will(returnValue(message1)); will(returnValue(message1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1, oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1,
maxLatency); maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids); oneOf(database).lowerRequestedFlag(txn, contactId, ids);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
@@ -907,11 +907,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(txn)); will(returnValue(txn));
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
oneOf(database).getMessagesToOffer(txn, contactId, 123); oneOf(database).getMessagesToOffer(txn, contactId, 123, maxLatency);
will(returnValue(ids)); will(returnValue(ids));
oneOf(database).updateExpiryTime(txn, contactId, messageId, oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
maxLatency); maxLatency);
oneOf(database).updateExpiryTime(txn, contactId, messageId1, oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1,
maxLatency); maxLatency);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
}}); }});
@@ -967,15 +967,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
oneOf(database).getRequestedMessagesToSend(txn, contactId, oneOf(database).getRequestedMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2); MAX_MESSAGE_LENGTH * 2, maxLatency);
will(returnValue(ids)); will(returnValue(ids));
oneOf(database).getMessage(txn, messageId); oneOf(database).getMessage(txn, messageId);
will(returnValue(message)); will(returnValue(message));
oneOf(database).updateExpiryTime(txn, contactId, messageId, oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
maxLatency); maxLatency);
oneOf(database).getMessage(txn, messageId1); oneOf(database).getMessage(txn, messageId1);
will(returnValue(message1)); will(returnValue(message1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1, oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1,
maxLatency); maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids); oneOf(database).lowerRequestedFlag(txn, contactId, ids);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);

View File

@@ -96,6 +96,9 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
*/ */
private static final int STEADY_STATE_BLOCKS = 5; private static final int STEADY_STATE_BLOCKS = 5;
// All our transports use a maximum latency of 30 seconds
private static final int MAX_LATENCY = 30 * 1000;
protected final File testDir = getTestDirectory(); protected final File testDir = getTestDirectory();
private final File resultsFile = new File(getTestName() + ".tsv"); private final File resultsFile = new File(getTestName() + ".tsv");
protected final Random random = new Random(); protected final Random random = new Random();
@@ -448,7 +451,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
benchmark(name, db -> { benchmark(name, db -> {
Connection txn = db.startTransaction(); Connection txn = db.startTransaction();
db.getMessagesToOffer(txn, pickRandom(contacts).getId(), db.getMessagesToOffer(txn, pickRandom(contacts).getId(),
MAX_MESSAGE_IDS); MAX_MESSAGE_IDS, MAX_LATENCY);
db.commitTransaction(txn); db.commitTransaction(txn);
}); });
} }
@@ -470,7 +473,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
benchmark(name, db -> { benchmark(name, db -> {
Connection txn = db.startTransaction(); Connection txn = db.startTransaction();
db.getMessagesToSend(txn, pickRandom(contacts).getId(), db.getMessagesToSend(txn, pickRandom(contacts).getId(),
MAX_MESSAGE_IDS); MAX_MESSAGE_IDS, MAX_LATENCY);
db.commitTransaction(txn); db.commitTransaction(txn);
}); });
} }
@@ -521,7 +524,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
benchmark(name, db -> { benchmark(name, db -> {
Connection txn = db.startTransaction(); Connection txn = db.startTransaction();
db.getRequestedMessagesToSend(txn, pickRandom(contacts).getId(), db.getRequestedMessagesToSend(txn, pickRandom(contacts).getId(),
MAX_MESSAGE_IDS); MAX_MESSAGE_IDS, MAX_LATENCY);
db.commitTransaction(txn); db.commitTransaction(txn);
}); });
} }

View File

@@ -26,6 +26,7 @@ import org.briarproject.bramble.api.transport.KeySetId;
import org.briarproject.bramble.api.transport.OutgoingKeys; import org.briarproject.bramble.api.transport.OutgoingKeys;
import org.briarproject.bramble.api.transport.TransportKeys; import org.briarproject.bramble.api.transport.TransportKeys;
import org.briarproject.bramble.system.SystemClock; import org.briarproject.bramble.system.SystemClock;
import org.briarproject.bramble.test.ArrayClock;
import org.briarproject.bramble.test.BrambleTestCase; import org.briarproject.bramble.test.BrambleTestCase;
import org.briarproject.bramble.test.TestDatabaseConfig; import org.briarproject.bramble.test.TestDatabaseConfig;
import org.briarproject.bramble.test.TestMessageFactory; import org.briarproject.bramble.test.TestMessageFactory;
@@ -60,6 +61,7 @@ import static org.briarproject.bramble.api.sync.ValidationManager.State.DELIVERE
import static org.briarproject.bramble.api.sync.ValidationManager.State.INVALID; import static org.briarproject.bramble.api.sync.ValidationManager.State.INVALID;
import static org.briarproject.bramble.api.sync.ValidationManager.State.PENDING; import static org.briarproject.bramble.api.sync.ValidationManager.State.PENDING;
import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN; import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN;
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
import static org.briarproject.bramble.test.TestUtils.getAuthor; import static org.briarproject.bramble.test.TestUtils.getAuthor;
import static org.briarproject.bramble.test.TestUtils.getClientId; import static org.briarproject.bramble.test.TestUtils.getClientId;
import static org.briarproject.bramble.test.TestUtils.getGroup; import static org.briarproject.bramble.test.TestUtils.getGroup;
@@ -82,6 +84,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
private static final int ONE_MEGABYTE = 1024 * 1024; private static final int ONE_MEGABYTE = 1024 * 1024;
private static final int MAX_SIZE = 5 * ONE_MEGABYTE; private static final int MAX_SIZE = 5 * ONE_MEGABYTE;
// All our transports use a maximum latency of 30 seconds
private static final int MAX_LATENCY = 30 * 1000;
private final SecretKey key = getSecretKey(); private final SecretKey key = getSecretKey();
private final File testDir = getTestDirectory(); private final File testDir = getTestDirectory();
@@ -199,16 +204,16 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// The contact has not seen the message, so it should be sendable // The contact has not seen the message, so it should be sendable
Collection<MessageId> ids = Collection<MessageId> ids =
db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
// Changing the status to seen = true should make the message unsendable // Changing the status to seen = true should make the message unsendable
db.raiseSeenFlag(txn, contactId, messageId); db.raiseSeenFlag(txn, contactId, messageId);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
db.commitTransaction(txn); db.commitTransaction(txn);
@@ -230,30 +235,30 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// The message has not been validated, so it should not be sendable // The message has not been validated, so it should not be sendable
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE); ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
// Marking the message delivered should make it sendable // Marking the message delivered should make it sendable
db.setMessageState(txn, messageId, DELIVERED); db.setMessageState(txn, messageId, DELIVERED);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
// Marking the message invalid should make it unsendable // Marking the message invalid should make it unsendable
db.setMessageState(txn, messageId, INVALID); db.setMessageState(txn, messageId, INVALID);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
// Marking the message pending should make it unsendable // Marking the message pending should make it unsendable
db.setMessageState(txn, messageId, PENDING); db.setMessageState(txn, messageId, PENDING);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
db.commitTransaction(txn); db.commitTransaction(txn);
@@ -274,37 +279,37 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// The group is invisible, so the message should not be sendable // The group is invisible, so the message should not be sendable
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE); ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
// Making the group visible should not make the message sendable // Making the group visible should not make the message sendable
db.addGroupVisibility(txn, contactId, groupId, false); db.addGroupVisibility(txn, contactId, groupId, false);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
// Sharing the group should make the message sendable // Sharing the group should make the message sendable
db.setGroupVisibility(txn, contactId, groupId, true); db.setGroupVisibility(txn, contactId, groupId, true);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
// Unsharing the group should make the message unsendable // Unsharing the group should make the message unsendable
db.setGroupVisibility(txn, contactId, groupId, false); db.setGroupVisibility(txn, contactId, groupId, false);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
// Making the group invisible should make the message unsendable // Making the group invisible should make the message unsendable
db.removeGroupVisibility(txn, contactId, groupId); db.removeGroupVisibility(txn, contactId, groupId);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
db.commitTransaction(txn); db.commitTransaction(txn);
@@ -326,16 +331,16 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// The message is not shared, so it should not be sendable // The message is not shared, so it should not be sendable
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE); ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
// Sharing the message should make it sendable // Sharing the message should make it sendable
db.setMessageShared(txn, messageId); db.setMessageShared(txn, messageId);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
db.commitTransaction(txn); db.commitTransaction(txn);
@@ -356,12 +361,13 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.addMessage(txn, message, DELIVERED, true, null); db.addMessage(txn, message, DELIVERED, true, null);
// The message is sendable, but too large to send // The message is sendable, but too large to send
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, Collection<MessageId> ids =
message.getRawLength() - 1); db.getMessagesToSend(txn, contactId, message.getRawLength() - 1,
MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
// The message is just the right size to send // The message is just the right size to send
ids = db.getMessagesToSend(txn, contactId, message.getRawLength()); ids = db.getMessagesToSend(txn, contactId, message.getRawLength(),
MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
db.commitTransaction(txn); db.commitTransaction(txn);
@@ -424,19 +430,19 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// Retrieve the message from the database and mark it as sent // Retrieve the message from the database and mark it as sent
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE); ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
db.updateExpiryTime(txn, contactId, messageId, Integer.MAX_VALUE); db.updateExpiryTimeAndEta(txn, contactId, messageId, MAX_LATENCY);
// The message should no longer be sendable // The message should no longer be sendable
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
// Pretend that the message was acked // Pretend that the message was acked
db.raiseSeenFlag(txn, contactId, messageId); db.raiseSeenFlag(txn, contactId, messageId);
// The message still should not be sendable // The message still should not be sendable
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
db.commitTransaction(txn); db.commitTransaction(txn);
@@ -1517,7 +1523,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertFalse(status.isSeen()); assertFalse(status.isSeen());
// Pretend the message was sent to the contact // Pretend the message was sent to the contact
db.updateExpiryTime(txn, contactId, messageId, Integer.MAX_VALUE); db.updateExpiryTimeAndEta(txn, contactId, messageId, Integer.MAX_VALUE);
// The message should be sent but not seen // The message should be sent but not seen
status = db.getMessageStatus(txn, contactId, messageId); status = db.getMessageStatus(txn, contactId, messageId);
@@ -1636,9 +1642,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// The message should be sendable // The message should be sendable
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE); ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
// The message should be available // The message should be available
@@ -1655,9 +1661,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(db.containsVisibleMessage(txn, contactId, messageId)); assertTrue(db.containsVisibleMessage(txn, contactId, messageId));
// The message should not be sendable // The message should not be sendable
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
// Requesting the message should throw an exception // Requesting the message should throw an exception
@@ -1761,12 +1767,12 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// Update the message's expiry time as though we sent it - now the // Update the message's expiry time as though we sent it - now the
// message should be sendable after one round-trip // message should be sendable after one round-trip
db.updateExpiryTime(txn, contactId, messageId, 1000); db.updateExpiryTimeAndEta(txn, contactId, messageId, 1000);
assertEquals(now + 2000, db.getNextSendTime(txn, contactId)); assertEquals(now + 2000, db.getNextSendTime(txn, contactId));
// Update the message's expiry time again - now it should be sendable // Update the message's expiry time again - now it should be sendable
// after two round-trips // after two round-trips
db.updateExpiryTime(txn, contactId, messageId, 1000); db.updateExpiryTimeAndEta(txn, contactId, messageId, 1000);
assertEquals(now + 4000, db.getNextSendTime(txn, contactId)); assertEquals(now + 4000, db.getNextSendTime(txn, contactId));
// Delete the message - there should be no messages to send // Delete the message - there should be no messages to send
@@ -1809,6 +1815,103 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.close(); db.close();
} }
@Test
public void testMessageRetransmission() throws Exception {
long now = System.currentTimeMillis();
long steps[] = {now, now, now + MAX_LATENCY * 2,
now + MAX_LATENCY * 2 + 1};
Database<Connection> db =
open(false, new TestMessageFactory(), new ArrayClock(steps));
Connection txn = db.startTransaction();
// Add a contact, a shared group and a shared message
db.addLocalAuthor(txn, localAuthor);
assertEquals(contactId, db.addContact(txn, author, localAuthor.getId(),
true, true));
db.addGroup(txn, group);
db.addGroupVisibility(txn, contactId, groupId, true);
db.addMessage(txn, message, DELIVERED, true, null);
// Time: now
// Retrieve the message from the database
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids);
// Time: now
// Mark the message as sent
db.updateExpiryTimeAndEta(txn, contactId, messageId, MAX_LATENCY);
// The message should expire after 2 * MAX_LATENCY
assertEquals(now + MAX_LATENCY * 2, db.getNextSendTime(txn, contactId));
// Time: now + MAX_LATENCY * 2
// The message should not yet be sendable
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty());
// Time: now + MAX_LATENCY * 2 + 1
// The message should have expired and should now be sendable
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids);
db.commitTransaction(txn);
db.close();
}
@Test
public void testFasterMessageRetransmission() throws Exception {
long now = System.currentTimeMillis();
long steps[] = {now, now, now, now, now + 1};
Database<Connection> db =
open(false, new TestMessageFactory(), new ArrayClock(steps));
Connection txn = db.startTransaction();
// Add a contact, a shared group and a shared message
db.addLocalAuthor(txn, localAuthor);
assertEquals(contactId, db.addContact(txn, author, localAuthor.getId(),
true, true));
db.addGroup(txn, group);
db.addGroupVisibility(txn, contactId, groupId, true);
db.addMessage(txn, message, DELIVERED, true, null);
// Time: now
// Retrieve the message from the database
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE, MAX_LATENCY);
assertEquals(singletonList(messageId), ids);
// Time: now
// Mark the message as sent
db.updateExpiryTimeAndEta(txn, contactId, messageId, MAX_LATENCY);
// The message should expire after 2 * MAX_LATENCY
assertEquals(now + MAX_LATENCY * 2, db.getNextSendTime(txn, contactId));
// Time: now
// The message should not be sendable via the same transport
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
assertTrue(ids.isEmpty());
// Time: now
// The message should be sendable via a transport with a faster ETA
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE,
MAX_LATENCY - 1);
assertEquals(singletonList(messageId), ids);
// Time: now + 1
// The message should no longer be sendable via the faster transport,
// as the ETA is now equal
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE,
MAX_LATENCY - 1);
assertTrue(ids.isEmpty());
db.commitTransaction(txn);
db.close();
}
private Database<Connection> open(boolean resume) throws Exception { private Database<Connection> open(boolean resume) throws Exception {
return open(resume, new TestMessageFactory(), new SystemClock()); return open(resume, new TestMessageFactory(), new SystemClock());
} }
@@ -1846,7 +1949,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
@After @After
public void tearDown() { public void tearDown() {
TestUtils.deleteTestDirectory(testDir); deleteTestDirectory(testDir);
} }
private static class StoppedClock implements Clock { private static class StoppedClock implements Clock {