mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-11 18:29:05 +01:00
Add database methods for selecting small messages.
This commit is contained in:
@@ -163,19 +163,23 @@ public interface DatabaseComponent extends TransactionManager {
|
||||
* less than or equal to the given length, for transmission over a
|
||||
* transport with the given maximum latency. Returns null if there are no
|
||||
* sendable messages that fit in the given length.
|
||||
*
|
||||
* @param small True if only single-block messages should be sent
|
||||
*/
|
||||
@Nullable
|
||||
Collection<Message> generateBatch(Transaction txn, ContactId c,
|
||||
int maxLength, int maxLatency) throws DbException;
|
||||
int maxLength, int maxLatency, boolean small) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns an offer for the given contact for transmission over a
|
||||
* transport with the given maximum latency, or null if there are no
|
||||
* messages to offer.
|
||||
*
|
||||
* @param small True if only single-block messages should be offered
|
||||
*/
|
||||
@Nullable
|
||||
Offer generateOffer(Transaction txn, ContactId c, int maxMessages,
|
||||
int maxLatency) throws DbException;
|
||||
int maxLatency, boolean small) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns a request for the given contact, or null if there are no
|
||||
|
||||
@@ -29,10 +29,15 @@ public interface SyncConstants {
|
||||
*/
|
||||
int MESSAGE_HEADER_LENGTH = UniqueId.LENGTH + 8;
|
||||
|
||||
/**
|
||||
* The maximum length of a block in bytes.
|
||||
*/
|
||||
int MAX_BLOCK_LENGTH = 32 * 1024; // 32 KiB
|
||||
|
||||
/**
|
||||
* The maximum length of a message body in bytes.
|
||||
*/
|
||||
int MAX_MESSAGE_BODY_LENGTH = 32 * 1024; // 32 KiB
|
||||
int MAX_MESSAGE_BODY_LENGTH = MAX_BLOCK_LENGTH;
|
||||
|
||||
/**
|
||||
* The maximum length of a message in bytes.
|
||||
|
||||
@@ -456,6 +456,15 @@ interface Database<T> {
|
||||
Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
|
||||
int maxMessages, int maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of some single-block messages that are eligible to be
|
||||
* offered to the given contact, up to the given number of messages.
|
||||
* <p/>
|
||||
* Read-only.
|
||||
*/
|
||||
Collection<MessageId> getSmallMessagesToOffer(T txn, ContactId c,
|
||||
int maxMessages, int maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of some messages that are eligible to be requested from
|
||||
* the given contact, up to the given number of messages.
|
||||
@@ -474,6 +483,15 @@ interface Database<T> {
|
||||
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength,
|
||||
int maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of some single-block messages that are eligible to be
|
||||
* sent to the given contact, up to the given total length.
|
||||
* <p/>
|
||||
* Read-only.
|
||||
*/
|
||||
Collection<MessageId> getSmallMessagesToSend(T txn, ContactId c,
|
||||
int maxLength, int maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of any messages that need to be validated.
|
||||
* <p/>
|
||||
|
||||
@@ -406,13 +406,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
@Nullable
|
||||
@Override
|
||||
public Collection<Message> generateBatch(Transaction transaction,
|
||||
ContactId c, int maxLength, int maxLatency) throws DbException {
|
||||
ContactId c, int maxLength, int maxLatency, boolean small)
|
||||
throws DbException {
|
||||
if (transaction.isReadOnly()) throw new IllegalArgumentException();
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
Collection<MessageId> ids =
|
||||
db.getMessagesToSend(txn, c, maxLength, maxLatency);
|
||||
Collection<MessageId> ids;
|
||||
if (small)
|
||||
ids = db.getSmallMessagesToSend(txn, c, maxLength, maxLatency);
|
||||
else ids = db.getMessagesToSend(txn, c, maxLength, maxLatency);
|
||||
List<Message> messages = new ArrayList<>(ids.size());
|
||||
for (MessageId m : ids) {
|
||||
messages.add(db.getMessage(txn, m));
|
||||
@@ -427,13 +430,15 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
@Nullable
|
||||
@Override
|
||||
public Offer generateOffer(Transaction transaction, ContactId c,
|
||||
int maxMessages, int maxLatency) throws DbException {
|
||||
int maxMessages, int maxLatency, boolean small) throws DbException {
|
||||
if (transaction.isReadOnly()) throw new IllegalArgumentException();
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
Collection<MessageId> ids =
|
||||
db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
|
||||
Collection<MessageId> ids;
|
||||
if (small)
|
||||
ids = db.getSmallMessagesToOffer(txn, c, maxMessages, maxLatency);
|
||||
else ids = db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
|
||||
if (ids.isEmpty()) return null;
|
||||
for (MessageId m : ids)
|
||||
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
|
||||
@@ -448,8 +453,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
Collection<MessageId> ids = db.getMessagesToRequest(txn, c,
|
||||
maxMessages);
|
||||
Collection<MessageId> ids =
|
||||
db.getMessagesToRequest(txn, c, maxMessages);
|
||||
if (ids.isEmpty()) return null;
|
||||
db.removeOfferedMessages(txn, c, ids);
|
||||
return new Request(ids);
|
||||
|
||||
@@ -75,6 +75,8 @@ import static org.briarproject.bramble.api.db.Metadata.REMOVE;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED;
|
||||
import static org.briarproject.bramble.api.sync.validation.MessageState.PENDING;
|
||||
import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN;
|
||||
@@ -178,7 +180,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
+ " state INT NOT NULL,"
|
||||
+ " shared BOOLEAN NOT NULL,"
|
||||
+ " temporary BOOLEAN NOT NULL,"
|
||||
+ " length INT NOT NULL,"
|
||||
+ " length INT NOT NULL," // Includes message header
|
||||
+ " deleted BOOLEAN NOT NULL,"
|
||||
+ " blockCount INT NOT NULL,"
|
||||
+ " PRIMARY KEY (messageId),"
|
||||
@@ -230,7 +232,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
"CREATE TABLE blocks"
|
||||
+ " (messageId _HASH NOT NULL,"
|
||||
+ " blockNumber INT NOT NULL,"
|
||||
+ " blockLength INT NOT NULL,"
|
||||
+ " blockLength INT NOT NULL," // Excludes block header
|
||||
+ " data BLOB," // Null if message has been deleted
|
||||
+ " PRIMARY KEY (messageId, blockNumber),"
|
||||
+ " FOREIGN KEY (messageId)"
|
||||
@@ -2134,6 +2136,42 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getSmallMessagesToOffer(Connection txn,
|
||||
ContactId c, int maxMessages, int maxLatency) throws DbException {
|
||||
long now = clock.currentTimeMillis();
|
||||
long eta = now + maxLatency;
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT messageId FROM statuses"
|
||||
+ " WHERE contactId = ? AND state = ?"
|
||||
+ " AND length <= ?"
|
||||
+ " AND groupShared = TRUE AND messageShared = TRUE"
|
||||
+ " AND deleted = FALSE"
|
||||
+ " AND seen = FALSE AND requested = FALSE"
|
||||
+ " AND (expiry <= ? OR eta > ?)"
|
||||
+ " ORDER BY timestamp LIMIT ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, DELIVERED.getValue());
|
||||
ps.setInt(3, MESSAGE_HEADER_LENGTH + MAX_BLOCK_LENGTH);
|
||||
ps.setLong(4, now);
|
||||
ps.setLong(5, eta);
|
||||
ps.setInt(6, maxMessages);
|
||||
rs = ps.executeQuery();
|
||||
List<MessageId> ids = new ArrayList<>();
|
||||
while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
|
||||
rs.close();
|
||||
ps.close();
|
||||
return ids;
|
||||
} catch (SQLException e) {
|
||||
tryToClose(rs, LOG, WARNING);
|
||||
tryToClose(ps, LOG, WARNING);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getMessagesToRequest(Connection txn,
|
||||
ContactId c, int maxMessages) throws DbException {
|
||||
@@ -2198,6 +2236,47 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getSmallMessagesToSend(Connection txn,
|
||||
ContactId c, int maxLength, int maxLatency) throws DbException {
|
||||
long now = clock.currentTimeMillis();
|
||||
long eta = now + maxLatency;
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT length, messageId FROM statuses"
|
||||
+ " WHERE contactId = ? AND state = ?"
|
||||
+ " AND length <= ?"
|
||||
+ " AND groupShared = TRUE AND messageShared = TRUE"
|
||||
+ " AND deleted = FALSE"
|
||||
+ " AND seen = FALSE"
|
||||
+ " AND (expiry <= ? OR eta > ?)"
|
||||
+ " ORDER BY timestamp";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, DELIVERED.getValue());
|
||||
ps.setInt(3, MESSAGE_HEADER_LENGTH + MAX_BLOCK_LENGTH);
|
||||
ps.setLong(4, now);
|
||||
ps.setLong(5, eta);
|
||||
rs = ps.executeQuery();
|
||||
List<MessageId> ids = new ArrayList<>();
|
||||
int total = 0;
|
||||
while (rs.next()) {
|
||||
int length = rs.getInt(1);
|
||||
if (total + length > maxLength) break;
|
||||
ids.add(new MessageId(rs.getBytes(2)));
|
||||
total += length;
|
||||
}
|
||||
rs.close();
|
||||
ps.close();
|
||||
return ids;
|
||||
} catch (SQLException e) {
|
||||
tryToClose(rs, LOG, WARNING);
|
||||
tryToClose(ps, LOG, WARNING);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getMessagesToValidate(Connection txn)
|
||||
throws DbException {
|
||||
|
||||
@@ -52,7 +52,7 @@ class Migration47_48 implements Migration<Connection> {
|
||||
s.execute(dbTypes.replaceTypes("CREATE TABLE blocks"
|
||||
+ " (messageId _HASH NOT NULL,"
|
||||
+ " blockNumber INT NOT NULL,"
|
||||
+ " blockLength INT NOT NULL,"
|
||||
+ " blockLength INT NOT NULL," // Excludes block header
|
||||
+ " data BLOB," // Null if message has been deleted
|
||||
+ " PRIMARY KEY (messageId, blockNumber),"
|
||||
+ " FOREIGN KEY (messageId)"
|
||||
|
||||
@@ -340,7 +340,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
try {
|
||||
Offer o = db.transactionWithNullableResult(false, txn -> {
|
||||
Offer offer = db.generateOffer(txn, contactId,
|
||||
MAX_MESSAGE_IDS, maxLatency);
|
||||
MAX_MESSAGE_IDS, maxLatency, true);
|
||||
setNextSendTime(db.getNextSendTime(txn, contactId));
|
||||
return offer;
|
||||
});
|
||||
|
||||
@@ -186,7 +186,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
Collection<Message> b =
|
||||
db.transactionWithNullableResult(false, txn ->
|
||||
db.generateBatch(txn, contactId,
|
||||
MAX_RECORD_PAYLOAD_BYTES, maxLatency));
|
||||
MAX_RECORD_PAYLOAD_BYTES, maxLatency,
|
||||
true));
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info("Generated batch: " + (b != null));
|
||||
if (b == null) decrementOutstandingQueries();
|
||||
|
||||
@@ -323,7 +323,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.generateBatch(transaction, contactId, 123, 456));
|
||||
db.generateBatch(transaction, contactId, 123, 456, true));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
@@ -331,7 +331,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.generateOffer(transaction, contactId, 123, 456));
|
||||
db.generateOffer(transaction, contactId, 123, 456, true));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
@@ -865,7 +865,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
will(returnValue(txn));
|
||||
oneOf(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getMessagesToSend(txn, contactId,
|
||||
oneOf(database).getSmallMessagesToSend(txn, contactId,
|
||||
MAX_MESSAGE_LENGTH * 2, maxLatency);
|
||||
will(returnValue(ids));
|
||||
oneOf(database).getMessage(txn, messageId);
|
||||
@@ -885,7 +885,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
assertEquals(messages, db.generateBatch(transaction, contactId,
|
||||
MAX_MESSAGE_LENGTH * 2, maxLatency)));
|
||||
MAX_MESSAGE_LENGTH * 2, maxLatency, true)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -897,7 +897,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
will(returnValue(txn));
|
||||
oneOf(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getMessagesToOffer(txn, contactId, 123, maxLatency);
|
||||
oneOf(database).getSmallMessagesToOffer(txn, contactId, 123,
|
||||
maxLatency);
|
||||
will(returnValue(ids));
|
||||
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
|
||||
maxLatency);
|
||||
@@ -909,7 +910,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction -> {
|
||||
Offer o = db.generateOffer(transaction, contactId, 123, maxLatency);
|
||||
Offer o = db.generateOffer(transaction, contactId, 123, maxLatency,
|
||||
true);
|
||||
assertNotNull(o);
|
||||
assertEquals(ids, o.getMessageIds());
|
||||
});
|
||||
|
||||
@@ -64,7 +64,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(noMsgTxn));
|
||||
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
|
||||
with(any(int.class)), with(MAX_LATENCY));
|
||||
with(any(int.class)), with(MAX_LATENCY), with(true));
|
||||
will(returnValue(null));
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
@@ -101,7 +101,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(msgTxn));
|
||||
oneOf(db).generateBatch(with(msgTxn), with(contactId),
|
||||
with(any(int.class)), with(MAX_LATENCY));
|
||||
with(any(int.class)), with(MAX_LATENCY), with(true));
|
||||
will(returnValue(singletonList(message)));
|
||||
oneOf(recordWriter).writeMessage(message);
|
||||
// No more acks
|
||||
@@ -113,7 +113,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(noMsgTxn));
|
||||
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
|
||||
with(any(int.class)), with(MAX_LATENCY));
|
||||
with(any(int.class)), with(MAX_LATENCY), with(true));
|
||||
will(returnValue(null));
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
|
||||
Reference in New Issue
Block a user