mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-12 18:59:06 +01:00
Retransmission without backoff for messages and updates.
This commit is contained in:
@@ -113,10 +113,12 @@ public interface DatabaseComponent {
|
||||
RetentionAck generateRetentionAck(ContactId c) throws DbException;
|
||||
|
||||
/**
|
||||
* Generates a retention update for the given contact. Returns null if no
|
||||
* update is due.
|
||||
* Generates a retention update for the given contact, for transmission
|
||||
* over a transport with the given latency. Returns null if no update is
|
||||
* due.
|
||||
*/
|
||||
RetentionUpdate generateRetentionUpdate(ContactId c) throws DbException;
|
||||
RetentionUpdate generateRetentionUpdate(ContactId c, long maxLatency)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Generates a subscription ack for the given contact. Returns null if no
|
||||
@@ -125,10 +127,11 @@ public interface DatabaseComponent {
|
||||
SubscriptionAck generateSubscriptionAck(ContactId c) throws DbException;
|
||||
|
||||
/**
|
||||
* Generates a subscription update for the given contact. Returns null if
|
||||
* no update is due.
|
||||
* Generates a subscription update for the given contact, for transmission
|
||||
* over a transport with the given latency. Returns null if no update is
|
||||
* due.
|
||||
*/
|
||||
SubscriptionUpdate generateSubscriptionUpdate(ContactId c)
|
||||
SubscriptionUpdate generateSubscriptionUpdate(ContactId c, long maxLatency)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
@@ -139,11 +142,12 @@ public interface DatabaseComponent {
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Generates a batch of transport updates for the given contact. Returns
|
||||
* null if no updates are due.
|
||||
* Generates a batch of transport updates for the given contact, for
|
||||
* transmission over a transport with the given latency. Returns null if no
|
||||
* updates are due.
|
||||
*/
|
||||
Collection<TransportUpdate> generateTransportUpdates(ContactId c)
|
||||
throws DbException;
|
||||
Collection<TransportUpdate> generateTransportUpdates(ContactId c,
|
||||
long maxLatency) throws DbException;
|
||||
|
||||
/** Returns the configuration for the given transport. */
|
||||
TransportConfig getConfig(TransportId t) throws DbException;
|
||||
|
||||
@@ -25,6 +25,8 @@ import net.sf.briar.api.messaging.TransportUpdate;
|
||||
import net.sf.briar.api.transport.Endpoint;
|
||||
import net.sf.briar.api.transport.TemporarySecret;
|
||||
|
||||
// FIXME: Document the preconditions for calling each method
|
||||
|
||||
/**
|
||||
* A low-level interface to the database (DatabaseComponent provides a
|
||||
* high-level interface). Most operations take a transaction argument, which is
|
||||
@@ -103,15 +105,6 @@ interface Database<T> {
|
||||
*/
|
||||
void addMessageToAck(T txn, ContactId c, MessageId m) throws DbException;
|
||||
|
||||
/**
|
||||
* Records the given messages as needing to be acknowledged by the given
|
||||
* expiry time.
|
||||
* <p>
|
||||
* Locking: contact read, message write.
|
||||
*/
|
||||
void addOutstandingMessages(T txn, ContactId c, Collection<MessageId> sent,
|
||||
long expiry) throws DbException;
|
||||
|
||||
/**
|
||||
* Stores the given message, or returns false if the message is already in
|
||||
* the database.
|
||||
@@ -308,9 +301,9 @@ interface Database<T> {
|
||||
byte[] getRawMessage(T txn, MessageId m) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the message identified by the given ID, in serialised form, or
|
||||
* null if the message is not present in the database or is not sendable to
|
||||
* the given contact.
|
||||
* Returns the message identified by the given ID, in serialised form.
|
||||
* Returns null if the message is not present in the database or is not
|
||||
* sendable to the given contact.
|
||||
* <p>
|
||||
* Locking: contact read, message read, subscription read.
|
||||
*/
|
||||
@@ -355,12 +348,13 @@ interface Database<T> {
|
||||
RetentionAck getRetentionAck(T txn, ContactId c) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns a retention update for the given contact, or null if no update
|
||||
* is due.
|
||||
* Returns a retention update for the given contact and updates its expiry
|
||||
* time using the given latency. Returns null if no update is due.
|
||||
* <p>
|
||||
* Locking: contact read, retention write.
|
||||
*/
|
||||
RetentionUpdate getRetentionUpdate(T txn, ContactId c) throws DbException;
|
||||
RetentionUpdate getRetentionUpdate(T txn, ContactId c, long maxLatency)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Returns all temporary secrets.
|
||||
@@ -416,13 +410,13 @@ interface Database<T> {
|
||||
SubscriptionAck getSubscriptionAck(T txn, ContactId c) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns a subscription update for the given contact, or null if no
|
||||
* update is due.
|
||||
* Returns a subscription update for the given contact and updates its
|
||||
* expiry time using the given latency. Returns null if no update is due.
|
||||
* <p>
|
||||
* Locking: contact read, subscription write.
|
||||
*/
|
||||
SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c)
|
||||
throws DbException;
|
||||
SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c,
|
||||
long maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns a collection of transport acks for the given contact, or null if
|
||||
@@ -434,13 +428,14 @@ interface Database<T> {
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Returns a collection of transport updates for the given contact, or
|
||||
* null if no updates are due.
|
||||
* Returns a collection of transport updates for the given contact and
|
||||
* updates their expiry times using the given latency. Returns null if no
|
||||
* updates are due.
|
||||
* <p>
|
||||
* Locking: contact read, transport write.
|
||||
*/
|
||||
Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c)
|
||||
throws DbException;
|
||||
Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c,
|
||||
long maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the version number of the
|
||||
@@ -572,6 +567,15 @@ interface Database<T> {
|
||||
void setConnectionWindow(T txn, ContactId c, TransportId t, long period,
|
||||
long centre, byte[] bitmap) throws DbException;
|
||||
|
||||
/**
|
||||
* Updates the expiry times of the given messages with respect to the given
|
||||
* contact, using the latency of the transport over which they were sent.
|
||||
* <p>
|
||||
* Locking: contact read, message write.
|
||||
*/
|
||||
void setMessageExpiry(T txn, ContactId c, Collection<MessageId> sent,
|
||||
long maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Sets the user's rating for the given author.
|
||||
* <p>
|
||||
|
||||
@@ -495,7 +495,7 @@ DatabaseCleaner.Callback {
|
||||
// Get some sendable messages from the database
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
messageLock.readLock().lock();
|
||||
messageLock.writeLock().lock();
|
||||
try {
|
||||
subscriptionLock.readLock().lock();
|
||||
try {
|
||||
@@ -515,17 +515,15 @@ DatabaseCleaner.Callback {
|
||||
subscriptionLock.readLock().unlock();
|
||||
}
|
||||
} finally {
|
||||
messageLock.readLock().unlock();
|
||||
messageLock.writeLock().unlock();
|
||||
}
|
||||
if(messages.isEmpty()) return null;
|
||||
// Calculate the expiry time of the messages
|
||||
long expiry = calculateExpiryTime(maxLatency);
|
||||
// Record the messages as sent
|
||||
messageLock.writeLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
db.addOutstandingMessages(txn, c, ids, expiry);
|
||||
db.setMessageExpiry(txn, c, ids, maxLatency);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -580,14 +578,12 @@ DatabaseCleaner.Callback {
|
||||
messageLock.readLock().unlock();
|
||||
}
|
||||
if(messages.isEmpty()) return null;
|
||||
// Calculate the expiry times of the messages
|
||||
long expiry = calculateExpiryTime(maxLatency);
|
||||
// Record the messages as sent
|
||||
messageLock.writeLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
db.addOutstandingMessages(txn, c, ids, expiry);
|
||||
db.setMessageExpiry(txn, c, ids, maxLatency);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -602,14 +598,6 @@ DatabaseCleaner.Callback {
|
||||
return Collections.unmodifiableList(messages);
|
||||
}
|
||||
|
||||
private long calculateExpiryTime(long maxLatency) {
|
||||
long roundTrip = maxLatency * 2;
|
||||
if(roundTrip < 0) roundTrip = Long.MAX_VALUE; // Overflow
|
||||
long expiry = clock.currentTimeMillis() + roundTrip;
|
||||
if(expiry < 0) expiry = Long.MAX_VALUE; // Overflow
|
||||
return expiry;
|
||||
}
|
||||
|
||||
public Offer generateOffer(ContactId c, int maxMessages)
|
||||
throws DbException {
|
||||
Collection<MessageId> offered;
|
||||
@@ -660,7 +648,7 @@ DatabaseCleaner.Callback {
|
||||
}
|
||||
}
|
||||
|
||||
public RetentionUpdate generateRetentionUpdate(ContactId c)
|
||||
public RetentionUpdate generateRetentionUpdate(ContactId c, long maxLatency)
|
||||
throws DbException {
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
@@ -670,7 +658,8 @@ DatabaseCleaner.Callback {
|
||||
try {
|
||||
if(!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
RetentionUpdate u = db.getRetentionUpdate(txn, c);
|
||||
RetentionUpdate u =
|
||||
db.getRetentionUpdate(txn, c, maxLatency);
|
||||
db.commitTransaction(txn);
|
||||
return u;
|
||||
} catch(DbException e) {
|
||||
@@ -710,8 +699,8 @@ DatabaseCleaner.Callback {
|
||||
}
|
||||
}
|
||||
|
||||
public SubscriptionUpdate generateSubscriptionUpdate(ContactId c)
|
||||
throws DbException {
|
||||
public SubscriptionUpdate generateSubscriptionUpdate(ContactId c,
|
||||
long maxLatency) throws DbException {
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
subscriptionLock.writeLock().lock();
|
||||
@@ -720,7 +709,8 @@ DatabaseCleaner.Callback {
|
||||
try {
|
||||
if(!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
SubscriptionUpdate u = db.getSubscriptionUpdate(txn, c);
|
||||
SubscriptionUpdate u =
|
||||
db.getSubscriptionUpdate(txn, c, maxLatency);
|
||||
db.commitTransaction(txn);
|
||||
return u;
|
||||
} catch(DbException e) {
|
||||
@@ -760,8 +750,8 @@ DatabaseCleaner.Callback {
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<TransportUpdate> generateTransportUpdates(ContactId c)
|
||||
throws DbException {
|
||||
public Collection<TransportUpdate> generateTransportUpdates(ContactId c,
|
||||
long maxLatency) throws DbException {
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
transportLock.writeLock().lock();
|
||||
@@ -771,7 +761,7 @@ DatabaseCleaner.Callback {
|
||||
if(!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
Collection<TransportUpdate> updates =
|
||||
db.getTransportUpdates(txn, c);
|
||||
db.getTransportUpdates(txn, c, maxLatency);
|
||||
db.commitTransaction(txn);
|
||||
return updates;
|
||||
} catch(DbException e) {
|
||||
|
||||
@@ -69,7 +69,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
// Locking: contact read, subscription
|
||||
private static final String CREATE_GROUP_VISIBILITIES =
|
||||
"CREATE TABLE groupVisibilities"
|
||||
+ " (contactId INT NOT NULL,"
|
||||
+ " (contactId INT UNSIGNED NOT NULL,"
|
||||
+ " groupId HASH NOT NULL,"
|
||||
+ " FOREIGN KEY (contactId)"
|
||||
+ " REFERENCES contacts (contactId)"
|
||||
@@ -81,7 +81,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
// Locking: contact read, subscription
|
||||
private static final String CREATE_CONTACT_GROUPS =
|
||||
"CREATE TABLE contactGroups"
|
||||
+ " (contactId INT NOT NULL,"
|
||||
+ " (contactId INT UNSIGNED NOT NULL,"
|
||||
+ " groupId HASH NOT NULL," // Not a foreign key
|
||||
+ " name VARCHAR NOT NULL,"
|
||||
+ " key BINARY," // Null for unrestricted groups
|
||||
@@ -93,11 +93,12 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
// Locking: contact read, subscription
|
||||
private static final String CREATE_GROUP_VERSIONS =
|
||||
"CREATE TABLE groupVersions"
|
||||
+ " (contactId INT NOT NULL,"
|
||||
+ " localVersion BIGINT NOT NULL,"
|
||||
+ " localAcked BIGINT NOT NULL,"
|
||||
+ " remoteVersion BIGINT NOT NULL,"
|
||||
+ " (contactId INT UNSIGNED NOT NULL,"
|
||||
+ " localVersion BIGINT UNSIGNED NOT NULL,"
|
||||
+ " localAcked BIGINT UNSIGNED NOT NULL,"
|
||||
+ " remoteVersion BIGINT UNSIGNED NOT NULL,"
|
||||
+ " remoteAcked BOOLEAN NOT NULL,"
|
||||
+ " expiry BIGINT UNSIGNED NOT NULL,"
|
||||
+ " PRIMARY KEY (contactId),"
|
||||
+ " FOREIGN KEY (contactid)"
|
||||
+ " REFERENCES contacts (contactId)"
|
||||
@@ -111,13 +112,13 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
+ " groupId HASH," // Null for private messages
|
||||
+ " authorId HASH," // Null for private or anonymous msgs
|
||||
+ " subject VARCHAR NOT NULL,"
|
||||
+ " timestamp BIGINT NOT NULL,"
|
||||
+ " length INT NOT NULL,"
|
||||
+ " bodyStart INT NOT NULL,"
|
||||
+ " bodyLength INT NOT NULL,"
|
||||
+ " timestamp BIGINT UNSIGNED NOT NULL,"
|
||||
+ " length INT UNSIGNED NOT NULL,"
|
||||
+ " bodyStart INT UNSIGNED NOT NULL,"
|
||||
+ " bodyLength INT UNSIGNED NOT NULL,"
|
||||
+ " raw BLOB NOT NULL,"
|
||||
+ " sendability INT," // Null for private messages
|
||||
+ " contactId INT," // Null for group messages
|
||||
+ " sendability INT UNSIGNED," // Null for private messages
|
||||
+ " contactId INT UNSIGNED," // Null for group messages
|
||||
+ " read BOOLEAN NOT NULL,"
|
||||
+ " starred BOOLEAN NOT NULL,"
|
||||
+ " PRIMARY KEY (messageId),"
|
||||
@@ -144,7 +145,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
private static final String CREATE_MESSAGES_TO_ACK =
|
||||
"CREATE TABLE messagesToAck"
|
||||
+ " (messageId HASH NOT NULL,"
|
||||
+ " contactId INT NOT NULL,"
|
||||
+ " contactId INT UNSIGNED NOT NULL,"
|
||||
+ " PRIMARY KEY (messageId, contactId),"
|
||||
+ " FOREIGN KEY (contactId)"
|
||||
+ " REFERENCES contacts (contactId)"
|
||||
@@ -154,9 +155,9 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
private static final String CREATE_STATUSES =
|
||||
"CREATE TABLE statuses"
|
||||
+ " (messageId HASH NOT NULL,"
|
||||
+ " contactId INT NOT NULL,"
|
||||
+ " contactId INT UNSIGNED NOT NULL,"
|
||||
+ " seen BOOLEAN NOT NULL,"
|
||||
+ " expiry BIGINT NOT NULL,"
|
||||
+ " expiry BIGINT UNSIGNED NOT NULL,"
|
||||
+ " PRIMARY KEY (messageId, contactId),"
|
||||
+ " FOREIGN KEY (messageId)"
|
||||
+ " REFERENCES messages (messageId)"
|
||||
@@ -181,12 +182,13 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
// Locking: contact read, retention
|
||||
private static final String CREATE_RETENTION_VERSIONS =
|
||||
"CREATE TABLE retentionVersions"
|
||||
+ " (contactId INT NOT NULL,"
|
||||
+ " retention BIGINT NOT NULL,"
|
||||
+ " localVersion BIGINT NOT NULL,"
|
||||
+ " localAcked BIGINT NOT NULL,"
|
||||
+ " remoteVersion BIGINT NOT NULL,"
|
||||
+ " (contactId INT UNSIGNED NOT NULL,"
|
||||
+ " retention BIGINT UNSIGNED NOT NULL,"
|
||||
+ " localVersion BIGINT UNSIGNED NOT NULL,"
|
||||
+ " localAcked BIGINT UNSIGNED NOT NULL,"
|
||||
+ " remoteVersion BIGINT UNSIGNED NOT NULL,"
|
||||
+ " remoteAcked BOOLEAN NOT NULL,"
|
||||
+ " expiry BIGINT UNSIGNED NOT NULL,"
|
||||
+ " PRIMARY KEY (contactId),"
|
||||
+ " FOREIGN KEY (contactId)"
|
||||
+ " REFERENCES contacts (contactId)"
|
||||
@@ -221,10 +223,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
// Locking: contact read, transport
|
||||
private static final String CREATE_TRANSPORT_VERSIONS =
|
||||
"CREATE TABLE transportVersions"
|
||||
+ " (contactId HASH NOT NULL,"
|
||||
+ " (contactId INT UNSIGNED NOT NULL,"
|
||||
+ " transportId HASH NOT NULL,"
|
||||
+ " localVersion BIGINT NOT NULL,"
|
||||
+ " localAcked BIGINT NOT NULL,"
|
||||
+ " localVersion BIGINT UNSIGNED NOT NULL,"
|
||||
+ " localAcked BIGINT UNSIGNED NOT NULL,"
|
||||
+ " expiry BIGINT UNSIGNED NOT NULL,"
|
||||
+ " PRIMARY KEY (contactId, transportId),"
|
||||
+ " FOREIGN KEY (contactId)"
|
||||
+ " REFERENCES contacts (contactId)"
|
||||
@@ -236,7 +239,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
// Locking: contact read, transport
|
||||
private static final String CREATE_CONTACT_TRANSPORT_PROPS =
|
||||
"CREATE TABLE contactTransportProperties"
|
||||
+ " (contactId INT NOT NULL,"
|
||||
+ " (contactId INT UNSIGNED NOT NULL,"
|
||||
+ " transportId HASH NOT NULL," // Not a foreign key
|
||||
+ " key VARCHAR NOT NULL,"
|
||||
+ " value VARCHAR NOT NULL,"
|
||||
@@ -248,9 +251,9 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
// Locking: contact read, transport
|
||||
private static final String CREATE_CONTACT_TRANSPORT_VERSIONS =
|
||||
"CREATE TABLE contactTransportVersions"
|
||||
+ " (contactId HASH NOT NULL,"
|
||||
+ " (contactId INT UNSIGNED NOT NULL,"
|
||||
+ " transportId HASH NOT NULL," // Not a foreign key
|
||||
+ " remoteVersion BIGINT NOT NULL,"
|
||||
+ " remoteVersion BIGINT UNSIGNED NOT NULL,"
|
||||
+ " remoteAcked BOOLEAN NOT NULL,"
|
||||
+ " PRIMARY KEY (contactId, transportId),"
|
||||
+ " FOREIGN KEY (contactId)"
|
||||
@@ -260,11 +263,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
// Locking: contact read, transport read, window
|
||||
private static final String CREATE_ENDPOINTS =
|
||||
"CREATE TABLE endpoints"
|
||||
+ " (contactId INT NOT NULL,"
|
||||
+ " (contactId INT UNSIGNED NOT NULL,"
|
||||
+ " transportId HASH NOT NULL,"
|
||||
+ " epoch BIGINT NOT NULL,"
|
||||
+ " clockDiff BIGINT NOT NULL,"
|
||||
+ " latency BIGINT NOT NULL,"
|
||||
+ " epoch BIGINT UNSIGNED NOT NULL,"
|
||||
+ " clockDiff BIGINT UNSIGNED NOT NULL,"
|
||||
+ " latency BIGINT UNSIGNED NOT NULL,"
|
||||
+ " alice BOOLEAN NOT NULL,"
|
||||
+ " PRIMARY KEY (contactId, transportId),"
|
||||
+ " FOREIGN KEY (contactId)"
|
||||
@@ -277,12 +280,12 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
// Locking: contact read, transport read, window
|
||||
private static final String CREATE_SECRETS =
|
||||
"CREATE TABLE secrets"
|
||||
+ " (contactId INT NOT NULL,"
|
||||
+ " (contactId INT UNSIGNED NOT NULL,"
|
||||
+ " transportId HASH NOT NULL,"
|
||||
+ " period BIGINT NOT NULL,"
|
||||
+ " period BIGINT UNSIGNED NOT NULL,"
|
||||
+ " secret SECRET NOT NULL,"
|
||||
+ " outgoing BIGINT NOT NULL,"
|
||||
+ " centre BIGINT NOT NULL,"
|
||||
+ " outgoing BIGINT UNSIGNED NOT NULL,"
|
||||
+ " centre BIGINT UNSIGNED NOT NULL,"
|
||||
+ " bitmap BINARY NOT NULL,"
|
||||
+ " PRIMARY KEY (contactId, transportId, period),"
|
||||
+ " FOREIGN KEY (contactId)"
|
||||
@@ -505,9 +508,10 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
rs.close();
|
||||
ps.close();
|
||||
// Create a retention version row
|
||||
sql = "INSERT INTO retentionVersions (contactId, retention,"
|
||||
+ " localVersion, localAcked, remoteVersion, remoteAcked)"
|
||||
+ " VALUES (?, ZERO(), ?, ZERO(), ZERO(), TRUE)";
|
||||
sql = "INSERT INTO retentionVersions"
|
||||
+ " (contactId, retention, localVersion, localAcked,"
|
||||
+ " remoteVersion, remoteAcked, expiry)"
|
||||
+ " VALUES (?, ZERO(), ?, ZERO(), ZERO(), TRUE, ZERO())";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, 1);
|
||||
@@ -516,8 +520,8 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
ps.close();
|
||||
// Create a group version row
|
||||
sql = "INSERT INTO groupVersions (contactId, localVersion,"
|
||||
+ " localAcked, remoteVersion, remoteAcked)"
|
||||
+ " VALUES (?, ?, ZERO(), ZERO(), TRUE)";
|
||||
+ " localAcked, remoteVersion, remoteAcked, expiry)"
|
||||
+ " VALUES (?, ?, ZERO(), ZERO(), TRUE, ZERO())";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, 1);
|
||||
@@ -533,9 +537,9 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
rs.close();
|
||||
ps.close();
|
||||
if(transports.isEmpty()) return c;
|
||||
sql = "INSERT INTO transportVersions"
|
||||
+ " (contactId, transportId, localVersion, localAcked)"
|
||||
+ " VALUES (?, ?, ?, ZERO())";
|
||||
sql = "INSERT INTO transportVersions (contactId, transportId,"
|
||||
+ " localVersion, localAcked, expiry)"
|
||||
+ " VALUES (?, ?, ?, ZERO(), ZERO())";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(3, 1);
|
||||
@@ -646,33 +650,6 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public void addOutstandingMessages(Connection txn, ContactId c,
|
||||
Collection<MessageId> sent, long expiry) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
try {
|
||||
// Update the expiry time of each message
|
||||
String sql = "UPDATE statuses SET expiry = ?"
|
||||
+ " WHERE messageId = ? AND contactId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setLong(1, expiry);
|
||||
ps.setInt(3, c.getInt());
|
||||
for(MessageId m : sent) {
|
||||
ps.setBytes(2, m.getBytes());
|
||||
ps.addBatch();
|
||||
}
|
||||
int[] batchAffected = ps.executeBatch();
|
||||
if(batchAffected.length != sent.size())
|
||||
throw new DbStateException();
|
||||
for(int i = 0; i < batchAffected.length; i++) {
|
||||
if(batchAffected[i] > 1) throw new DbStateException();
|
||||
}
|
||||
ps.close();
|
||||
} catch(SQLException e) {
|
||||
tryToClose(ps);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean addPrivateMessage(Connection txn, Message m, ContactId c)
|
||||
throws DbException {
|
||||
assert m.getGroup() == null;
|
||||
@@ -812,9 +789,9 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
rs.close();
|
||||
ps.close();
|
||||
if(contacts.isEmpty()) return;
|
||||
sql = "INSERT INTO transportVersions"
|
||||
+ " (contactId, transportId, localVersion, localAcked)"
|
||||
+ " VALUES (?, ?, ?, ZERO())";
|
||||
sql = "INSERT INTO transportVersions (contactId, transportId,"
|
||||
+ " localVersion, localAcked, expiry)"
|
||||
+ " VALUES (?, ?, ?, ZERO(), ZERO())";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setBytes(2, t.getBytes());
|
||||
ps.setInt(3, 1);
|
||||
@@ -848,7 +825,8 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
if(affected != 1) throw new DbStateException();
|
||||
ps.close();
|
||||
// Bump the subscription version
|
||||
sql = "UPDATE groupVersions SET localVersion = localVersion + ?"
|
||||
sql = "UPDATE groupVersions"
|
||||
+ " SET localVersion = localVersion + ?, expiry = ZERO()"
|
||||
+ " WHERE contactId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, 1);
|
||||
@@ -1444,7 +1422,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
+ " WHERE m.messageId = ?"
|
||||
+ " AND cg.contactId = ?"
|
||||
+ " AND timestamp >= retention"
|
||||
+ " AND seen = FALSE AND expiry < ?"
|
||||
+ " AND seen = FALSE AND s.expiry < ?"
|
||||
+ " AND sendability > ZERO()";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setBytes(1, m.getBytes());
|
||||
@@ -1557,19 +1535,23 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public RetentionUpdate getRetentionUpdate(Connection txn, ContactId c)
|
||||
throws DbException {
|
||||
public RetentionUpdate getRetentionUpdate(Connection txn, ContactId c,
|
||||
long maxLatency) throws DbException {
|
||||
long now = clock.currentTimeMillis();
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT timestamp, localVersion"
|
||||
+ " FROM messages AS m"
|
||||
+ " JOIN retentionVersions AS rv"
|
||||
+ " WHERE rv.contactId = ? AND localVersion > localAcked"
|
||||
+ " WHERE rv.contactId = ?"
|
||||
+ " AND localVersion > localAcked"
|
||||
+ " AND expiry < ?"
|
||||
+ " ORDER BY timestamp LIMIT ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, 1);
|
||||
ps.setLong(2, now);
|
||||
ps.setInt(3, 1);
|
||||
rs = ps.executeQuery();
|
||||
if(!rs.next()) {
|
||||
rs.close();
|
||||
@@ -1582,6 +1564,13 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
if(rs.next()) throw new DbStateException();
|
||||
rs.close();
|
||||
ps.close();
|
||||
sql = "UPDATE retentionVersions SET expiry = ? WHERE contactId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setLong(1, calculateExpiry(now, maxLatency));
|
||||
ps.setInt(2, c.getInt());
|
||||
int affected = ps.executeUpdate();
|
||||
if(affected != 1) throw new DbStateException();
|
||||
ps.close();
|
||||
return new RetentionUpdate(retention, version);
|
||||
} catch(SQLException e) {
|
||||
tryToClose(ps);
|
||||
@@ -1692,7 +1681,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
+ " AND cg.contactId = s.contactId"
|
||||
+ " WHERE cg.contactId = ?"
|
||||
+ " AND timestamp >= retention"
|
||||
+ " AND seen = FALSE AND expiry < ?"
|
||||
+ " AND seen = FALSE AND s.expiry < ?"
|
||||
+ " AND sendability > ZERO()"
|
||||
+ " ORDER BY timestamp DESC";
|
||||
ps = txn.prepareStatement(sql);
|
||||
@@ -1822,8 +1811,9 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public SubscriptionUpdate getSubscriptionUpdate(Connection txn, ContactId c)
|
||||
throws DbException {
|
||||
public SubscriptionUpdate getSubscriptionUpdate(Connection txn, ContactId c,
|
||||
long maxLatency) throws DbException {
|
||||
long now = clock.currentTimeMillis();
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
@@ -1834,9 +1824,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
+ " JOIN groupVersions AS ver"
|
||||
+ " ON vis.contactId = ver.contactId"
|
||||
+ " WHERE vis.contactId = ?"
|
||||
+ " AND localVersion > localAcked";
|
||||
+ " AND localVersion > localAcked"
|
||||
+ " AND expiry < ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setLong(2, now);
|
||||
rs = ps.executeQuery();
|
||||
List<Group> subs = new ArrayList<Group>();
|
||||
long version = 0;
|
||||
@@ -1850,6 +1842,13 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
rs.close();
|
||||
ps.close();
|
||||
if(subs.isEmpty()) return null;
|
||||
sql = "UPDATE groupVersions SET expiry = ? WHERE contactId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setLong(1, calculateExpiry(now, maxLatency));
|
||||
ps.setInt(2, c.getInt());
|
||||
int affected = ps.executeUpdate();
|
||||
if(affected != 1) throw new DbStateException();
|
||||
ps.close();
|
||||
subs = Collections.unmodifiableList(subs);
|
||||
return new SubscriptionUpdate(subs, version);
|
||||
} catch(SQLException e) {
|
||||
@@ -1902,7 +1901,8 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
|
||||
public Collection<TransportUpdate> getTransportUpdates(Connection txn,
|
||||
ContactId c) throws DbException {
|
||||
ContactId c, long maxLatency) throws DbException {
|
||||
long now = clock.currentTimeMillis();
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
@@ -1911,9 +1911,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
+ " JOIN transportVersions AS tv"
|
||||
+ " ON tp.transportId = tv.transportId"
|
||||
+ " WHERE tv.contactId = ?"
|
||||
+ " AND localVersion > localAcked";
|
||||
+ " AND localVersion > localAcked"
|
||||
+ " AND expiry < ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setLong(2, now);
|
||||
rs = ps.executeQuery();
|
||||
List<TransportUpdate> updates = new ArrayList<TransportUpdate>();
|
||||
TransportId lastId = null;
|
||||
@@ -1931,6 +1933,22 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
rs.close();
|
||||
ps.close();
|
||||
if(updates.isEmpty()) return null;
|
||||
sql = "UPDATE transportVersions SET expiry = ?"
|
||||
+ " WHERE contactId = ? AND transportId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setLong(1, calculateExpiry(now, maxLatency));
|
||||
ps.setInt(2, c.getInt());
|
||||
for(TransportUpdate u : updates) {
|
||||
ps.setBytes(3, u.getId().getBytes());
|
||||
ps.addBatch();
|
||||
}
|
||||
int [] batchAffected = ps.executeBatch();
|
||||
if(batchAffected.length != updates.size())
|
||||
throw new DbStateException();
|
||||
for(int i = 0; i < batchAffected.length; i++) {
|
||||
if(batchAffected[i] != 1) throw new DbStateException();
|
||||
}
|
||||
ps.close();
|
||||
return Collections.unmodifiableList(updates);
|
||||
} catch(SQLException e) {
|
||||
tryToClose(ps);
|
||||
@@ -2045,7 +2063,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
+ " AND cg.contactId = s.contactId"
|
||||
+ " WHERE cg.contactId = ?"
|
||||
+ " AND timestamp >= retention"
|
||||
+ " AND seen = FALSE AND expiry < ?"
|
||||
+ " AND seen = FALSE AND s.expiry < ?"
|
||||
+ " AND sendability > ZERO()"
|
||||
+ " LIMIT ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
@@ -2110,7 +2128,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
PreparedStatement ps = null;
|
||||
try {
|
||||
String sql = "UPDATE retentionVersions"
|
||||
+ " SET localVersion = localVersion + ?";
|
||||
+ " SET localVersion = localVersion + ?, expiry = ZERO()";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, 1);
|
||||
ps.executeUpdate();
|
||||
@@ -2226,7 +2244,8 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
ps.close();
|
||||
if(visible.isEmpty()) return;
|
||||
// Bump the subscription version for the affected contacts
|
||||
sql = "UPDATE groupVersions SET localVersion = localVersion + ?"
|
||||
sql = "UPDATE groupVersions"
|
||||
+ " SET localVersion = localVersion + ?, expiry = ZERO()"
|
||||
+ " WHERE contactId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, 1);
|
||||
@@ -2277,7 +2296,8 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
if(affected != 1) throw new DbStateException();
|
||||
ps.close();
|
||||
// Bump the subscription version
|
||||
sql = "UPDATE groupVersions SET localVersion = localVersion + ?"
|
||||
sql = "UPDATE groupVersions"
|
||||
+ " SET localVersion = localVersion + ?, expiry = ZERO()"
|
||||
+ " WHERE contactId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, 1);
|
||||
@@ -2305,7 +2325,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
PreparedStatement ps = null;
|
||||
try {
|
||||
String sql = "UPDATE transportVersions"
|
||||
+ " SET localVersion = localVersion + ?"
|
||||
+ " SET localVersion = localVersion + ?, expiry = ZERO()"
|
||||
+ " WHERE transportId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, 1);
|
||||
@@ -2385,20 +2405,26 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public void setRetentionTime(Connection txn, ContactId c, long retention,
|
||||
long version) throws DbException {
|
||||
public void setMessageExpiry(Connection txn, ContactId c,
|
||||
Collection<MessageId> sent, long maxLatency) throws DbException {
|
||||
long now = clock.currentTimeMillis();
|
||||
PreparedStatement ps = null;
|
||||
try {
|
||||
String sql = "UPDATE retentionVersions SET retention = ?,"
|
||||
+ " remoteVersion = ?, remoteAcked = FALSE"
|
||||
+ " WHERE contactId = ? AND remoteVersion < ?";
|
||||
String sql = "UPDATE statuses SET expiry = ?"
|
||||
+ " WHERE messageId = ? AND contactId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setLong(1, retention);
|
||||
ps.setLong(2, version);
|
||||
ps.setLong(1, calculateExpiry(now, maxLatency));
|
||||
ps.setInt(3, c.getInt());
|
||||
ps.setLong(4, version);
|
||||
int affected = ps.executeUpdate();
|
||||
if(affected > 1) throw new DbStateException();
|
||||
for(MessageId m : sent) {
|
||||
ps.setBytes(2, m.getBytes());
|
||||
ps.addBatch();
|
||||
}
|
||||
int[] batchAffected = ps.executeBatch();
|
||||
if(batchAffected.length != sent.size())
|
||||
throw new DbStateException();
|
||||
for(int i = 0; i < batchAffected.length; i++) {
|
||||
if(batchAffected[i] > 1) throw new DbStateException();
|
||||
}
|
||||
ps.close();
|
||||
} catch(SQLException e) {
|
||||
tryToClose(ps);
|
||||
@@ -2406,25 +2432,12 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public void setRetentionUpdateAcked(Connection txn, ContactId c,
|
||||
long version) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
try {
|
||||
String sql = "UPDATE retentionVersions SET localAcked = ?"
|
||||
+ " WHERE contactId = ?"
|
||||
+ " AND localAcked < ? AND localVersion >= ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setLong(1, version);
|
||||
ps.setInt(2, c.getInt());
|
||||
ps.setLong(3, version);
|
||||
ps.setLong(4, version);
|
||||
int affected = ps.executeUpdate();
|
||||
if(affected > 1) throw new DbStateException();
|
||||
ps.close();
|
||||
} catch(SQLException e) {
|
||||
tryToClose(ps);
|
||||
throw new DbException(e);
|
||||
}
|
||||
private long calculateExpiry(long now, long maxLatency) {
|
||||
long roundTrip = maxLatency * 2;
|
||||
if(roundTrip < 0) return Long.MAX_VALUE; // Overflow;
|
||||
long expiry = now + roundTrip;
|
||||
if(expiry < 0) return Long.MAX_VALUE; // Overflow
|
||||
return expiry;
|
||||
}
|
||||
|
||||
public Rating setRating(Connection txn, AuthorId a, Rating r)
|
||||
@@ -2586,6 +2599,48 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public void setRetentionTime(Connection txn, ContactId c, long retention,
|
||||
long version) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
try {
|
||||
String sql = "UPDATE retentionVersions SET retention = ?,"
|
||||
+ " remoteVersion = ?, remoteAcked = FALSE"
|
||||
+ " WHERE contactId = ? AND remoteVersion < ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setLong(1, retention);
|
||||
ps.setLong(2, version);
|
||||
ps.setInt(3, c.getInt());
|
||||
ps.setLong(4, version);
|
||||
int affected = ps.executeUpdate();
|
||||
if(affected > 1) throw new DbStateException();
|
||||
ps.close();
|
||||
} catch(SQLException e) {
|
||||
tryToClose(ps);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void setRetentionUpdateAcked(Connection txn, ContactId c,
|
||||
long version) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
try {
|
||||
String sql = "UPDATE retentionVersions SET localAcked = ?"
|
||||
+ " WHERE contactId = ?"
|
||||
+ " AND localAcked < ? AND localVersion >= ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setLong(1, version);
|
||||
ps.setInt(2, c.getInt());
|
||||
ps.setLong(3, version);
|
||||
ps.setLong(4, version);
|
||||
int affected = ps.executeUpdate();
|
||||
if(affected > 1) throw new DbStateException();
|
||||
ps.close();
|
||||
} catch(SQLException e) {
|
||||
tryToClose(ps);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void setSendability(Connection txn, MessageId m, int sendability)
|
||||
throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
|
||||
@@ -3,6 +3,7 @@ package net.sf.briar.messaging.duplex;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static net.sf.briar.api.Rating.GOOD;
|
||||
import static net.sf.briar.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@@ -87,6 +88,7 @@ abstract class DuplexConnection implements DatabaseListener {
|
||||
|
||||
private final Executor dbExecutor, verificationExecutor;
|
||||
private final MessageVerifier messageVerifier;
|
||||
private final long maxLatency;
|
||||
private final AtomicBoolean canSendOffer, disposed;
|
||||
private final BlockingQueue<Runnable> writerTasks;
|
||||
|
||||
@@ -116,6 +118,7 @@ abstract class DuplexConnection implements DatabaseListener {
|
||||
this.transport = transport;
|
||||
contactId = ctx.getContactId();
|
||||
transportId = ctx.getTransportId();
|
||||
maxLatency = transport.getMaxLatency();
|
||||
canSendOffer = new AtomicBoolean(false);
|
||||
disposed = new AtomicBoolean(false);
|
||||
writerTasks = new LinkedBlockingQueue<Runnable>();
|
||||
@@ -467,8 +470,7 @@ abstract class DuplexConnection implements DatabaseListener {
|
||||
assert writer != null;
|
||||
try {
|
||||
Collection<byte[]> batch = db.generateBatch(contactId,
|
||||
Integer.MAX_VALUE, transport.getMaxLatency(),
|
||||
requested);
|
||||
MAX_PACKET_LENGTH, maxLatency, requested);
|
||||
if(batch == null) new GenerateOffer().run();
|
||||
else writerTasks.add(new WriteBatch(batch, requested));
|
||||
} catch(DbException e) {
|
||||
@@ -583,7 +585,8 @@ abstract class DuplexConnection implements DatabaseListener {
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
RetentionUpdate u = db.generateRetentionUpdate(contactId);
|
||||
RetentionUpdate u =
|
||||
db.generateRetentionUpdate(contactId, maxLatency);
|
||||
if(u != null) writerTasks.add(new WriteRetentionUpdate(u));
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
@@ -649,7 +652,8 @@ abstract class DuplexConnection implements DatabaseListener {
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId);
|
||||
SubscriptionUpdate u =
|
||||
db.generateSubscriptionUpdate(contactId, maxLatency);
|
||||
if(u != null) writerTasks.add(new WriteSubscriptionUpdate(u));
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
@@ -717,7 +721,7 @@ abstract class DuplexConnection implements DatabaseListener {
|
||||
public void run() {
|
||||
try {
|
||||
Collection<TransportUpdate> t =
|
||||
db.generateTransportUpdates(contactId);
|
||||
db.generateTransportUpdates(contactId, maxLatency);
|
||||
if(t != null) writerTasks.add(new WriteTransportUpdates(t));
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
|
||||
@@ -42,6 +42,7 @@ class OutgoingSimplexConnection {
|
||||
private final SimplexTransportWriter transport;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final long maxLatency;
|
||||
|
||||
OutgoingSimplexConnection(DatabaseComponent db,
|
||||
ConnectionRegistry connRegistry,
|
||||
@@ -56,6 +57,7 @@ class OutgoingSimplexConnection {
|
||||
this.transport = transport;
|
||||
contactId = ctx.getContactId();
|
||||
transportId = ctx.getTransportId();
|
||||
maxLatency = transport.getMaxLatency();
|
||||
}
|
||||
|
||||
void write() {
|
||||
@@ -69,7 +71,6 @@ class OutgoingSimplexConnection {
|
||||
throw new EOFException();
|
||||
PacketWriter writer = packetWriterFactory.createPacketWriter(out,
|
||||
transport.shouldFlush());
|
||||
long maxLatency = transport.getMaxLatency();
|
||||
// Send the initial packets: updates and acks
|
||||
boolean hasSpace = writeTransportAcks(conn, writer);
|
||||
if(hasSpace) hasSpace = writeTransportUpdates(conn, writer);
|
||||
@@ -128,7 +129,7 @@ class OutgoingSimplexConnection {
|
||||
PacketWriter writer) throws DbException, IOException {
|
||||
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
|
||||
Collection<TransportUpdate> updates =
|
||||
db.generateTransportUpdates(contactId);
|
||||
db.generateTransportUpdates(contactId, maxLatency);
|
||||
if(updates == null) return true;
|
||||
for(TransportUpdate u : updates) {
|
||||
writer.writeTransportUpdate(u);
|
||||
@@ -149,7 +150,8 @@ class OutgoingSimplexConnection {
|
||||
private boolean writeSubscriptionUpdate(ConnectionWriter conn,
|
||||
PacketWriter writer) throws DbException, IOException {
|
||||
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
|
||||
SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId);
|
||||
SubscriptionUpdate u =
|
||||
db.generateSubscriptionUpdate(contactId, maxLatency);
|
||||
if(u == null) return true;
|
||||
writer.writeSubscriptionUpdate(u);
|
||||
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
|
||||
@@ -167,7 +169,7 @@ class OutgoingSimplexConnection {
|
||||
private boolean writeRetentionUpdate(ConnectionWriter conn,
|
||||
PacketWriter writer) throws DbException, IOException {
|
||||
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
|
||||
RetentionUpdate u = db.generateRetentionUpdate(contactId);
|
||||
RetentionUpdate u = db.generateRetentionUpdate(contactId, maxLatency);
|
||||
if(u == null) return true;
|
||||
writer.writeRetentionUpdate(u);
|
||||
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
|
||||
|
||||
@@ -523,7 +523,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
|
||||
} catch(NoSuchContactException expected) {}
|
||||
|
||||
try {
|
||||
db.generateRetentionUpdate(contactId);
|
||||
db.generateRetentionUpdate(contactId, 123);
|
||||
fail();
|
||||
} catch(NoSuchContactException expected) {}
|
||||
|
||||
@@ -533,7 +533,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
|
||||
} catch(NoSuchContactException expected) {}
|
||||
|
||||
try {
|
||||
db.generateSubscriptionUpdate(contactId);
|
||||
db.generateSubscriptionUpdate(contactId, 123);
|
||||
fail();
|
||||
} catch(NoSuchContactException expected) {}
|
||||
|
||||
@@ -543,7 +543,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
|
||||
} catch(NoSuchContactException expected) {}
|
||||
|
||||
try {
|
||||
db.generateTransportUpdates(contactId);
|
||||
db.generateTransportUpdates(contactId, 123);
|
||||
fail();
|
||||
} catch(NoSuchContactException expected) {}
|
||||
|
||||
@@ -696,7 +696,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
|
||||
oneOf(database).getRawMessage(txn, messageId1);
|
||||
will(returnValue(raw1));
|
||||
// Record the outstanding messages
|
||||
oneOf(database).addOutstandingMessages(txn, contactId, sendable,
|
||||
oneOf(database).setMessageExpiry(txn, contactId, sendable,
|
||||
Long.MAX_VALUE);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, cleaner,
|
||||
@@ -733,8 +733,8 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
|
||||
will(returnValue(raw1)); // Message is sendable
|
||||
oneOf(database).getRawMessageIfSendable(txn, contactId, messageId2);
|
||||
will(returnValue(null)); // Message is not sendable
|
||||
// Record the outstanding message
|
||||
oneOf(database).addOutstandingMessages(txn, contactId,
|
||||
// Mark the message as sent
|
||||
oneOf(database).setMessageExpiry(txn, contactId,
|
||||
Arrays.asList(messageId1), Long.MAX_VALUE);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, cleaner,
|
||||
@@ -788,13 +788,14 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
|
||||
allowing(database).commitTransaction(txn);
|
||||
allowing(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getSubscriptionUpdate(txn, contactId);
|
||||
oneOf(database).getSubscriptionUpdate(txn, contactId,
|
||||
Long.MAX_VALUE);
|
||||
will(returnValue(null));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, cleaner,
|
||||
shutdown);
|
||||
|
||||
assertNull(db.generateSubscriptionUpdate(contactId));
|
||||
assertNull(db.generateSubscriptionUpdate(contactId, Long.MAX_VALUE));
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
@@ -812,13 +813,15 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
|
||||
allowing(database).commitTransaction(txn);
|
||||
allowing(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getSubscriptionUpdate(txn, contactId);
|
||||
oneOf(database).getSubscriptionUpdate(txn, contactId,
|
||||
Long.MAX_VALUE);
|
||||
will(returnValue(new SubscriptionUpdate(Arrays.asList(group), 1)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, cleaner,
|
||||
shutdown);
|
||||
|
||||
SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId);
|
||||
SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId,
|
||||
Long.MAX_VALUE);
|
||||
assertEquals(Arrays.asList(group), u.getGroups());
|
||||
assertEquals(1, u.getVersion());
|
||||
|
||||
@@ -838,13 +841,13 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
|
||||
allowing(database).commitTransaction(txn);
|
||||
allowing(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getTransportUpdates(txn, contactId);
|
||||
oneOf(database).getTransportUpdates(txn, contactId, Long.MAX_VALUE);
|
||||
will(returnValue(null));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, cleaner,
|
||||
shutdown);
|
||||
|
||||
assertNull(db.generateTransportUpdates(contactId));
|
||||
assertNull(db.generateTransportUpdates(contactId, Long.MAX_VALUE));
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
@@ -862,15 +865,15 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
|
||||
allowing(database).commitTransaction(txn);
|
||||
allowing(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getTransportUpdates(txn, contactId);
|
||||
oneOf(database).getTransportUpdates(txn, contactId, Long.MAX_VALUE);
|
||||
will(returnValue(Arrays.asList(new TransportUpdate(transportId,
|
||||
transportProperties, 1))));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, cleaner,
|
||||
shutdown);
|
||||
|
||||
Collection<TransportUpdate> updates = db.generateTransportUpdates(
|
||||
contactId);
|
||||
Collection<TransportUpdate> updates =
|
||||
db.generateTransportUpdates(contactId, Long.MAX_VALUE);
|
||||
assertNotNull(updates);
|
||||
assertEquals(1, updates.size());
|
||||
TransportUpdate u = updates.iterator().next();
|
||||
|
||||
@@ -523,7 +523,7 @@ public class H2DatabaseTest extends BriarTestCase {
|
||||
assertTrue(it.hasNext());
|
||||
assertEquals(messageId, it.next());
|
||||
assertFalse(it.hasNext());
|
||||
db.addOutstandingMessages(txn, contactId, Arrays.asList(messageId),
|
||||
db.setMessageExpiry(txn, contactId, Arrays.asList(messageId),
|
||||
Long.MAX_VALUE);
|
||||
|
||||
// The message should no longer be sendable
|
||||
|
||||
@@ -113,19 +113,22 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase {
|
||||
oneOf(db).generateTransportAcks(contactId);
|
||||
will(returnValue(null));
|
||||
// No transport updates to send
|
||||
oneOf(db).generateTransportUpdates(contactId);
|
||||
oneOf(db).generateTransportUpdates(with(contactId),
|
||||
with(any(long.class)));
|
||||
will(returnValue(null));
|
||||
// No subscription ack to send
|
||||
oneOf(db).generateSubscriptionAck(contactId);
|
||||
will(returnValue(null));
|
||||
// No subscription update to send
|
||||
oneOf(db).generateSubscriptionUpdate(contactId);
|
||||
oneOf(db).generateSubscriptionUpdate(with(contactId),
|
||||
with(any(long.class)));
|
||||
will(returnValue(null));
|
||||
// No retention ack to send
|
||||
oneOf(db).generateRetentionAck(contactId);
|
||||
will(returnValue(null));
|
||||
// No retention update to send
|
||||
oneOf(db).generateRetentionUpdate(contactId);
|
||||
oneOf(db).generateRetentionUpdate(with(contactId),
|
||||
with(any(long.class)));
|
||||
will(returnValue(null));
|
||||
// No acks to send
|
||||
oneOf(db).generateAck(with(contactId), with(any(int.class)));
|
||||
@@ -160,19 +163,22 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase {
|
||||
oneOf(db).generateTransportAcks(contactId);
|
||||
will(returnValue(null));
|
||||
// No transport updates to send
|
||||
oneOf(db).generateTransportUpdates(contactId);
|
||||
oneOf(db).generateTransportUpdates(with(contactId),
|
||||
with(any(long.class)));
|
||||
will(returnValue(null));
|
||||
// No subscription ack to send
|
||||
oneOf(db).generateSubscriptionAck(contactId);
|
||||
will(returnValue(null));
|
||||
// No subscription update to send
|
||||
oneOf(db).generateSubscriptionUpdate(contactId);
|
||||
oneOf(db).generateSubscriptionUpdate(with(contactId),
|
||||
with(any(long.class)));
|
||||
will(returnValue(null));
|
||||
// No retention ack to send
|
||||
oneOf(db).generateRetentionAck(contactId);
|
||||
will(returnValue(null));
|
||||
// No retention update to send
|
||||
oneOf(db).generateRetentionUpdate(contactId);
|
||||
oneOf(db).generateRetentionUpdate(with(contactId),
|
||||
with(any(long.class)));
|
||||
will(returnValue(null));
|
||||
// One ack to send
|
||||
oneOf(db).generateAck(with(contactId), with(any(int.class)));
|
||||
|
||||
Reference in New Issue
Block a user