From 4490a2cd3fd1a6fd2c7e1c036a17d89d016ebc00 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 20 Jun 2019 13:59:47 +0100 Subject: [PATCH] Add database methods for selecting small messages. --- .../bramble/api/db/DatabaseComponent.java | 8 +- .../bramble/api/sync/SyncConstants.java | 7 +- .../org/briarproject/bramble/db/Database.java | 18 ++++ .../bramble/db/DatabaseComponentImpl.java | 21 +++-- .../briarproject/bramble/db/JdbcDatabase.java | 83 ++++++++++++++++++- .../bramble/db/Migration47_48.java | 2 +- .../bramble/sync/DuplexOutgoingSession.java | 2 +- .../bramble/sync/SimplexOutgoingSession.java | 3 +- .../bramble/db/DatabaseComponentImplTest.java | 14 ++-- .../sync/SimplexOutgoingSessionTest.java | 6 +- 10 files changed, 139 insertions(+), 25 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index 7c525bcd3..d22a29fa2 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -163,19 +163,23 @@ public interface DatabaseComponent extends TransactionManager { * less than or equal to the given length, for transmission over a * transport with the given maximum latency. Returns null if there are no * sendable messages that fit in the given length. + * + * @param small True if only single-block messages should be sent */ @Nullable Collection generateBatch(Transaction txn, ContactId c, - int maxLength, int maxLatency) throws DbException; + int maxLength, int maxLatency, boolean small) throws DbException; /** * Returns an offer for the given contact for transmission over a * transport with the given maximum latency, or null if there are no * messages to offer. + * + * @param small True if only single-block messages should be offered */ @Nullable Offer generateOffer(Transaction txn, ContactId c, int maxMessages, - int maxLatency) throws DbException; + int maxLatency, boolean small) throws DbException; /** * Returns a request for the given contact, or null if there are no diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java index 2e9241bed..c4a8a778b 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java @@ -29,10 +29,15 @@ public interface SyncConstants { */ int MESSAGE_HEADER_LENGTH = UniqueId.LENGTH + 8; + /** + * The maximum length of a block in bytes. + */ + int MAX_BLOCK_LENGTH = 32 * 1024; // 32 KiB + /** * The maximum length of a message body in bytes. */ - int MAX_MESSAGE_BODY_LENGTH = 32 * 1024; // 32 KiB + int MAX_MESSAGE_BODY_LENGTH = MAX_BLOCK_LENGTH; /** * The maximum length of a message in bytes. diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index f6745af24..26f69a4c3 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -456,6 +456,15 @@ interface Database { Collection getMessagesToOffer(T txn, ContactId c, int maxMessages, int maxLatency) throws DbException; + /** + * Returns the IDs of some single-block messages that are eligible to be + * offered to the given contact, up to the given number of messages. + *

+ * Read-only. + */ + Collection getSmallMessagesToOffer(T txn, ContactId c, + int maxMessages, int maxLatency) throws DbException; + /** * Returns the IDs of some messages that are eligible to be requested from * the given contact, up to the given number of messages. @@ -474,6 +483,15 @@ interface Database { Collection getMessagesToSend(T txn, ContactId c, int maxLength, int maxLatency) throws DbException; + /** + * Returns the IDs of some single-block messages that are eligible to be + * sent to the given contact, up to the given total length. + *

+ * Read-only. + */ + Collection getSmallMessagesToSend(T txn, ContactId c, + int maxLength, int maxLatency) throws DbException; + /** * Returns the IDs of any messages that need to be validated. *

diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 6ec1760af..dc89c05a4 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -406,13 +406,16 @@ class DatabaseComponentImpl implements DatabaseComponent { @Nullable @Override public Collection generateBatch(Transaction transaction, - ContactId c, int maxLength, int maxLatency) throws DbException { + ContactId c, int maxLength, int maxLatency, boolean small) + throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); - Collection ids = - db.getMessagesToSend(txn, c, maxLength, maxLatency); + Collection ids; + if (small) + ids = db.getSmallMessagesToSend(txn, c, maxLength, maxLatency); + else ids = db.getMessagesToSend(txn, c, maxLength, maxLatency); List messages = new ArrayList<>(ids.size()); for (MessageId m : ids) { messages.add(db.getMessage(txn, m)); @@ -427,13 +430,15 @@ class DatabaseComponentImpl implements DatabaseComponent { @Nullable @Override public Offer generateOffer(Transaction transaction, ContactId c, - int maxMessages, int maxLatency) throws DbException { + int maxMessages, int maxLatency, boolean small) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); - Collection ids = - db.getMessagesToOffer(txn, c, maxMessages, maxLatency); + Collection ids; + if (small) + ids = db.getSmallMessagesToOffer(txn, c, maxMessages, maxLatency); + else ids = db.getMessagesToOffer(txn, c, maxMessages, maxLatency); if (ids.isEmpty()) return null; for (MessageId m : ids) db.updateExpiryTimeAndEta(txn, c, m, maxLatency); @@ -448,8 +453,8 @@ class DatabaseComponentImpl implements DatabaseComponent { T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); - Collection ids = db.getMessagesToRequest(txn, c, - maxMessages); + Collection ids = + db.getMessagesToRequest(txn, c, maxMessages); if (ids.isEmpty()) return null; db.removeOfferedMessages(txn, c, ids); return new Request(ids); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index eca3c181c..5e8b5fec5 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -75,6 +75,8 @@ import static org.briarproject.bramble.api.db.Metadata.REMOVE; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH; +import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED; import static org.briarproject.bramble.api.sync.validation.MessageState.PENDING; import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN; @@ -178,7 +180,7 @@ abstract class JdbcDatabase implements Database { + " state INT NOT NULL," + " shared BOOLEAN NOT NULL," + " temporary BOOLEAN NOT NULL," - + " length INT NOT NULL," + + " length INT NOT NULL," // Includes message header + " deleted BOOLEAN NOT NULL," + " blockCount INT NOT NULL," + " PRIMARY KEY (messageId)," @@ -230,7 +232,7 @@ abstract class JdbcDatabase implements Database { "CREATE TABLE blocks" + " (messageId _HASH NOT NULL," + " blockNumber INT NOT NULL," - + " blockLength INT NOT NULL," + + " blockLength INT NOT NULL," // Excludes block header + " data BLOB," // Null if message has been deleted + " PRIMARY KEY (messageId, blockNumber)," + " FOREIGN KEY (messageId)" @@ -2134,6 +2136,42 @@ abstract class JdbcDatabase implements Database { } } + @Override + public Collection getSmallMessagesToOffer(Connection txn, + ContactId c, int maxMessages, int maxLatency) throws DbException { + long now = clock.currentTimeMillis(); + long eta = now + maxLatency; + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT messageId FROM statuses" + + " WHERE contactId = ? AND state = ?" + + " AND length <= ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE" + + " AND seen = FALSE AND requested = FALSE" + + " AND (expiry <= ? OR eta > ?)" + + " ORDER BY timestamp LIMIT ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + ps.setInt(3, MESSAGE_HEADER_LENGTH + MAX_BLOCK_LENGTH); + ps.setLong(4, now); + ps.setLong(5, eta); + ps.setInt(6, maxMessages); + rs = ps.executeQuery(); + List ids = new ArrayList<>(); + while (rs.next()) ids.add(new MessageId(rs.getBytes(1))); + rs.close(); + ps.close(); + return ids; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public Collection getMessagesToRequest(Connection txn, ContactId c, int maxMessages) throws DbException { @@ -2198,6 +2236,47 @@ abstract class JdbcDatabase implements Database { } } + @Override + public Collection getSmallMessagesToSend(Connection txn, + ContactId c, int maxLength, int maxLatency) throws DbException { + long now = clock.currentTimeMillis(); + long eta = now + maxLatency; + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT length, messageId FROM statuses" + + " WHERE contactId = ? AND state = ?" + + " AND length <= ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE" + + " AND seen = FALSE" + + " AND (expiry <= ? OR eta > ?)" + + " ORDER BY timestamp"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + ps.setInt(3, MESSAGE_HEADER_LENGTH + MAX_BLOCK_LENGTH); + ps.setLong(4, now); + ps.setLong(5, eta); + rs = ps.executeQuery(); + List ids = new ArrayList<>(); + int total = 0; + while (rs.next()) { + int length = rs.getInt(1); + if (total + length > maxLength) break; + ids.add(new MessageId(rs.getBytes(2))); + total += length; + } + rs.close(); + ps.close(); + return ids; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public Collection getMessagesToValidate(Connection txn) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Migration47_48.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Migration47_48.java index d3edd0b2a..f31af980f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Migration47_48.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Migration47_48.java @@ -52,7 +52,7 @@ class Migration47_48 implements Migration { s.execute(dbTypes.replaceTypes("CREATE TABLE blocks" + " (messageId _HASH NOT NULL," + " blockNumber INT NOT NULL," - + " blockLength INT NOT NULL," + + " blockLength INT NOT NULL," // Excludes block header + " data BLOB," // Null if message has been deleted + " PRIMARY KEY (messageId, blockNumber)," + " FOREIGN KEY (messageId)" diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index 6eed42511..23437b800 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -340,7 +340,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { try { Offer o = db.transactionWithNullableResult(false, txn -> { Offer offer = db.generateOffer(txn, contactId, - MAX_MESSAGE_IDS, maxLatency); + MAX_MESSAGE_IDS, maxLatency, true); setNextSendTime(db.getNextSendTime(txn, contactId)); return offer; }); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 1d32ca4ee..7a6513ddf 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -186,7 +186,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener { Collection b = db.transactionWithNullableResult(false, txn -> db.generateBatch(txn, contactId, - MAX_RECORD_PAYLOAD_BYTES, maxLatency)); + MAX_RECORD_PAYLOAD_BYTES, maxLatency, + true)); if (LOG.isLoggable(INFO)) LOG.info("Generated batch: " + (b != null)); if (b == null) decrementOutstandingQueries(); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java index b8fb04bc8..244f3bc48 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java @@ -323,7 +323,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { try { db.transaction(false, transaction -> - db.generateBatch(transaction, contactId, 123, 456)); + db.generateBatch(transaction, contactId, 123, 456, true)); fail(); } catch (NoSuchContactException expected) { // Expected @@ -331,7 +331,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { try { db.transaction(false, transaction -> - db.generateOffer(transaction, contactId, 123, 456)); + db.generateOffer(transaction, contactId, 123, 456, true)); fail(); } catch (NoSuchContactException expected) { // Expected @@ -865,7 +865,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { will(returnValue(txn)); oneOf(database).containsContact(txn, contactId); will(returnValue(true)); - oneOf(database).getMessagesToSend(txn, contactId, + oneOf(database).getSmallMessagesToSend(txn, contactId, MAX_MESSAGE_LENGTH * 2, maxLatency); will(returnValue(ids)); oneOf(database).getMessage(txn, messageId); @@ -885,7 +885,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { db.transaction(false, transaction -> assertEquals(messages, db.generateBatch(transaction, contactId, - MAX_MESSAGE_LENGTH * 2, maxLatency))); + MAX_MESSAGE_LENGTH * 2, maxLatency, true))); } @Test @@ -897,7 +897,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { will(returnValue(txn)); oneOf(database).containsContact(txn, contactId); will(returnValue(true)); - oneOf(database).getMessagesToOffer(txn, contactId, 123, maxLatency); + oneOf(database).getSmallMessagesToOffer(txn, contactId, 123, + maxLatency); will(returnValue(ids)); oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId, maxLatency); @@ -909,7 +910,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { eventExecutor, shutdownManager); db.transaction(false, transaction -> { - Offer o = db.generateOffer(transaction, contactId, 123, maxLatency); + Offer o = db.generateOffer(transaction, contactId, 123, maxLatency, + true); assertNotNull(o); assertEquals(ids, o.getMessageIds()); }); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index df8dc7b87..5952b55b3 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -64,7 +64,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + with(any(int.class)), with(MAX_LATENCY), with(true)); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); @@ -101,7 +101,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(msgTxn)); oneOf(db).generateBatch(with(msgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + with(any(int.class)), with(MAX_LATENCY), with(true)); will(returnValue(singletonList(message))); oneOf(recordWriter).writeMessage(message); // No more acks @@ -113,7 +113,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + with(any(int.class)), with(MAX_LATENCY), with(true)); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream();