diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index 2771302a2..53ed7416a 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -190,6 +190,18 @@ public interface DatabaseComponent extends TransactionManager { Collection generateBatch(Transaction txn, ContactId c, int maxLength, int maxLatency) throws DbException; + /** + * Returns a batch of messages for the given contact containing the + * messages with the given IDs, for transmission over a transport with + * the given maximum latency. + *

+ * If any of the given messages are not in the database or are not visible + * to the contact, they are omitted from the batch without throwing an + * exception. + */ + Collection generateBatch(Transaction txn, ContactId c, + Collection ids, int maxLatency) throws DbException; + /** * Returns an offer for the given contact for transmission over a * transport with the given maximum latency, or null if there are no @@ -446,6 +458,17 @@ public interface DatabaseComponent extends TransactionManager { MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m) throws DbException; + /** + * Returns the IDs of all messages that are eligible to be sent to the + * given contact, together with their raw lengths. This may include + * messages that have already been sent and are not yet due for + * retransmission. + *

+ * Read-only. + */ + Map getUnackedMessagesToSend(Transaction txn, + ContactId c) throws DbException; + /** * Returns the next time (in milliseconds since the Unix epoch) when a * message is due to be deleted, or {@link #NO_CLEANUP_DEADLINE} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index 705913ee7..cb386ab5c 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -496,11 +496,28 @@ interface Database { * Returns the IDs of some messages that are eligible to be sent to the * given contact, up to the given total length. *

+ * Unlike {@link #getUnackedMessagesToSend(Object, ContactId)} this method + * does not return messages that have already been sent unless they are + * due for retransmission. + *

* Read-only. */ Collection getMessagesToSend(T txn, ContactId c, int maxLength, int maxLatency) throws DbException; + /** + * Returns the IDs of all messages that are eligible to be sent to the + * given contact, together with their raw lengths. + *

+ * Unlike {@link #getMessagesToSend(Object, ContactId, int, int)} this + * method may return messages that have already been sent and are not yet + * due for retransmission. + *

+ * Read-only. + */ + Map getUnackedMessagesToSend(T txn, ContactId c) + throws DbException; + /** * Returns the IDs of any messages that need to be validated. *

diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index b2422f9bc..d4c8ef682 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -436,6 +436,32 @@ class DatabaseComponentImpl implements DatabaseComponent { return messages; } + @Override + public Collection generateBatch(Transaction transaction, + ContactId c, Collection ids, int maxLatency) + throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + long totalLength = 0; + List messages = new ArrayList<>(ids.size()); + List sentIds = new ArrayList<>(ids.size()); + for (MessageId m : ids) { + if (db.containsVisibleMessage(txn, c, m)) { + Message message = db.getMessage(txn, m); + totalLength += message.getRawLength(); + messages.add(message); + sentIds.add(m); + db.updateExpiryTimeAndEta(txn, c, m, maxLatency); + } + } + if (messages.isEmpty()) return messages; + db.lowerRequestedFlag(txn, c, sentIds); + transaction.attach(new MessagesSentEvent(c, sentIds, totalLength)); + return messages; + } + @Nullable @Override public Offer generateOffer(Transaction transaction, ContactId c, @@ -714,6 +740,16 @@ class DatabaseComponentImpl implements DatabaseComponent { return status; } + @Override + public Map getUnackedMessagesToSend( + Transaction transaction, + ContactId c) throws DbException { + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + return db.getUnackedMessagesToSend(txn, c); + } + @Override public Map getMessageDependencies( Transaction transaction, MessageId m) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index 224b20bf1..32212193b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -51,6 +51,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -344,6 +345,11 @@ abstract class JdbcDatabase implements Database { "CREATE INDEX IF NOT EXISTS statusesByContactIdTimestamp" + " ON statuses (contactId, timestamp)"; + private static final String + INDEX_STATUSES_BY_CONTACT_ID_TX_COUNT_TIMESTAMP = + "CREATE INDEX IF NOT EXISTS statusesByContactIdTxCountTimestamp" + + " ON statuses (contactId, txCount, timestamp)"; + private static final String INDEX_MESSAGES_BY_CLEANUP_DEADLINE = "CREATE INDEX IF NOT EXISTS messagesByCleanupDeadline" + " ON messages (cleanupDeadline)"; @@ -570,6 +576,7 @@ abstract class JdbcDatabase implements Database { s.executeUpdate(INDEX_MESSAGE_DEPENDENCIES_BY_DEPENDENCY_ID); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_GROUP_ID); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TIMESTAMP); + s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TX_COUNT_TIMESTAMP); s.executeUpdate(INDEX_MESSAGES_BY_CLEANUP_DEADLINE); s.close(); } catch (SQLException e) { @@ -2259,6 +2266,37 @@ abstract class JdbcDatabase implements Database { } } + @Override + public Map getUnackedMessagesToSend(Connection txn, + ContactId c) throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT length, messageId FROM statuses" + + " WHERE contactId = ? AND state = ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE AND seen = FALSE" + + " ORDER BY txCount, timestamp"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + rs = ps.executeQuery(); + Map results = new LinkedHashMap<>(); + while (rs.next()) { + int length = rs.getInt(1); + MessageId id = new MessageId(rs.getBytes(2)); + results.put(id, length); + } + rs.close(); + ps.close(); + return results; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public Collection getMessagesToValidate(Connection txn) throws DbException {