From 9a58b37ce2fcdfa64253a209bb6cf003cfae41f9 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Mon, 7 Jun 2021 17:27:46 +0100 Subject: [PATCH 1/8] Add database methods for sending unacked messages. --- .../bramble/api/db/DatabaseComponent.java | 23 +++++++++++ .../org/briarproject/bramble/db/Database.java | 17 +++++++++ .../bramble/db/DatabaseComponentImpl.java | 36 ++++++++++++++++++ .../briarproject/bramble/db/JdbcDatabase.java | 38 +++++++++++++++++++ 4 files changed, 114 insertions(+) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index 2771302a2..53ed7416a 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -190,6 +190,18 @@ public interface DatabaseComponent extends TransactionManager { Collection generateBatch(Transaction txn, ContactId c, int maxLength, int maxLatency) throws DbException; + /** + * Returns a batch of messages for the given contact containing the + * messages with the given IDs, for transmission over a transport with + * the given maximum latency. + *

+ * If any of the given messages are not in the database or are not visible + * to the contact, they are omitted from the batch without throwing an + * exception. + */ + Collection generateBatch(Transaction txn, ContactId c, + Collection ids, int maxLatency) throws DbException; + /** * Returns an offer for the given contact for transmission over a * transport with the given maximum latency, or null if there are no @@ -446,6 +458,17 @@ public interface DatabaseComponent extends TransactionManager { MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m) throws DbException; + /** + * Returns the IDs of all messages that are eligible to be sent to the + * given contact, together with their raw lengths. This may include + * messages that have already been sent and are not yet due for + * retransmission. + *

+ * Read-only. + */ + Map getUnackedMessagesToSend(Transaction txn, + ContactId c) throws DbException; + /** * Returns the next time (in milliseconds since the Unix epoch) when a * message is due to be deleted, or {@link #NO_CLEANUP_DEADLINE} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index 705913ee7..cb386ab5c 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -496,11 +496,28 @@ interface Database { * Returns the IDs of some messages that are eligible to be sent to the * given contact, up to the given total length. *

+ * Unlike {@link #getUnackedMessagesToSend(Object, ContactId)} this method + * does not return messages that have already been sent unless they are + * due for retransmission. + *

* Read-only. */ Collection getMessagesToSend(T txn, ContactId c, int maxLength, int maxLatency) throws DbException; + /** + * Returns the IDs of all messages that are eligible to be sent to the + * given contact, together with their raw lengths. + *

+ * Unlike {@link #getMessagesToSend(Object, ContactId, int, int)} this + * method may return messages that have already been sent and are not yet + * due for retransmission. + *

+ * Read-only. + */ + Map getUnackedMessagesToSend(T txn, ContactId c) + throws DbException; + /** * Returns the IDs of any messages that need to be validated. *

diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index b2422f9bc..d4c8ef682 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -436,6 +436,32 @@ class DatabaseComponentImpl implements DatabaseComponent { return messages; } + @Override + public Collection generateBatch(Transaction transaction, + ContactId c, Collection ids, int maxLatency) + throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + long totalLength = 0; + List messages = new ArrayList<>(ids.size()); + List sentIds = new ArrayList<>(ids.size()); + for (MessageId m : ids) { + if (db.containsVisibleMessage(txn, c, m)) { + Message message = db.getMessage(txn, m); + totalLength += message.getRawLength(); + messages.add(message); + sentIds.add(m); + db.updateExpiryTimeAndEta(txn, c, m, maxLatency); + } + } + if (messages.isEmpty()) return messages; + db.lowerRequestedFlag(txn, c, sentIds); + transaction.attach(new MessagesSentEvent(c, sentIds, totalLength)); + return messages; + } + @Nullable @Override public Offer generateOffer(Transaction transaction, ContactId c, @@ -714,6 +740,16 @@ class DatabaseComponentImpl implements DatabaseComponent { return status; } + @Override + public Map getUnackedMessagesToSend( + Transaction transaction, + ContactId c) throws DbException { + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + return db.getUnackedMessagesToSend(txn, c); + } + @Override public Map getMessageDependencies( Transaction transaction, MessageId m) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index 224b20bf1..32212193b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -51,6 +51,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -344,6 +345,11 @@ abstract class JdbcDatabase implements Database { "CREATE INDEX IF NOT EXISTS statusesByContactIdTimestamp" + " ON statuses (contactId, timestamp)"; + private static final String + INDEX_STATUSES_BY_CONTACT_ID_TX_COUNT_TIMESTAMP = + "CREATE INDEX IF NOT EXISTS statusesByContactIdTxCountTimestamp" + + " ON statuses (contactId, txCount, timestamp)"; + private static final String INDEX_MESSAGES_BY_CLEANUP_DEADLINE = "CREATE INDEX IF NOT EXISTS messagesByCleanupDeadline" + " ON messages (cleanupDeadline)"; @@ -570,6 +576,7 @@ abstract class JdbcDatabase implements Database { s.executeUpdate(INDEX_MESSAGE_DEPENDENCIES_BY_DEPENDENCY_ID); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_GROUP_ID); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TIMESTAMP); + s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TX_COUNT_TIMESTAMP); s.executeUpdate(INDEX_MESSAGES_BY_CLEANUP_DEADLINE); s.close(); } catch (SQLException e) { @@ -2259,6 +2266,37 @@ abstract class JdbcDatabase implements Database { } } + @Override + public Map getUnackedMessagesToSend(Connection txn, + ContactId c) throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT length, messageId FROM statuses" + + " WHERE contactId = ? AND state = ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE AND seen = FALSE" + + " ORDER BY txCount, timestamp"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + rs = ps.executeQuery(); + Map results = new LinkedHashMap<>(); + while (rs.next()) { + int length = rs.getInt(1); + MessageId id = new MessageId(rs.getBytes(2)); + results.put(id, length); + } + rs.close(); + ps.close(); + return results; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public Collection getMessagesToValidate(Connection txn) throws DbException { From 77a3199aac5f87c0748826ac9a057d4cf82a596f Mon Sep 17 00:00:00 2001 From: akwizgran Date: Tue, 8 Jun 2021 12:10:52 +0100 Subject: [PATCH 2/8] Update SimplexOutgoingSession to support sending unacked messages. --- .../bramble/sync/SimplexOutgoingSession.java | 97 ++++++++++++++++++- .../bramble/sync/SyncSessionFactoryImpl.java | 2 +- .../sync/SimplexOutgoingSessionTest.java | 4 +- 3 files changed, 98 insertions(+), 5 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 1d32ca4ee..d069d38b0 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -15,6 +15,7 @@ import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; +import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.Versions; @@ -22,7 +23,11 @@ import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent; import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -61,6 +66,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private final ContactId contactId; private final TransportId transportId; private final int maxLatency; + private final boolean eager; private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; private final AtomicInteger outstandingQueries; @@ -70,7 +76,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, EventBus eventBus, ContactId contactId, TransportId transportId, - int maxLatency, StreamWriter streamWriter, + int maxLatency, boolean eager, StreamWriter streamWriter, SyncRecordWriter recordWriter) { this.db = db; this.dbExecutor = dbExecutor; @@ -78,6 +84,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { this.contactId = contactId; this.transportId = transportId; this.maxLatency = maxLatency; + this.eager = eager; this.streamWriter = streamWriter; this.recordWriter = recordWriter; outstandingQueries = new AtomicInteger(2); // One per type of record @@ -93,7 +100,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener { recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); // Start a query for each type of record dbExecutor.execute(new GenerateAck()); - dbExecutor.execute(new GenerateBatch()); + if (eager) dbExecutor.execute(new LoadUnackedMessageIds()); + else dbExecutor.execute(new GenerateBatch()); // Write records until interrupted or no more records to write try { while (!interrupted) { @@ -138,6 +146,91 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } } + private class LoadUnackedMessageIds implements Runnable { + + @DatabaseExecutor + @Override + public void run() { + if (interrupted) return; + try { + Map ids = + db.transactionWithResult(true, txn -> + db.getUnackedMessagesToSend(txn, contactId)); + if (LOG.isLoggable(INFO)) { + LOG.info(ids.size() + " unacked messages to send"); + } + if (ids.isEmpty()) decrementOutstandingQueries(); + else dbExecutor.execute(new GenerateEagerBatch(ids)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); + } + } + } + + private class GenerateEagerBatch implements Runnable { + + private final Map ids; + + private GenerateEagerBatch(Map ids) { + this.ids = ids; + } + + @DatabaseExecutor + @Override + public void run() { + if (interrupted) return; + // Take some message IDs from `ids` to form a batch + Collection batchIds = new ArrayList<>(); + long totalLength = 0; + Iterator> it = + ids.entrySet().iterator(); + while (it.hasNext()) { + // Check whether the next message will fit in the batch + Entry e = it.next(); + int length = e.getValue(); + if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break; + // Add the message to the batch + it.remove(); + batchIds.add(e.getKey()); + totalLength += length; + } + if (batchIds.isEmpty()) throw new AssertionError(); + try { + Collection batch = + db.transactionWithResult(false, txn -> + db.generateBatch(txn, contactId, batchIds, + maxLatency)); + writerTasks.add(new WriteEagerBatch(batch, ids)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); + } + } + } + + private class WriteEagerBatch implements ThrowingRunnable { + + private final Collection batch; + private final Map ids; + + private WriteEagerBatch(Collection batch, + Map ids) { + this.batch = batch; + this.ids = ids; + } + + @IoExecutor + @Override + public void run() throws IOException { + if (interrupted) return; + for (Message m : batch) recordWriter.writeMessage(m); + LOG.info("Sent eager batch"); + if (ids.isEmpty()) decrementOutstandingQueries(); + else dbExecutor.execute(new GenerateEagerBatch(ids)); + } + } + private class GenerateAck implements Runnable { @DatabaseExecutor diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index 4c590df73..74ec3e51a 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -65,7 +65,7 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t, - maxLatency, streamWriter, recordWriter); + maxLatency, false, streamWriter, recordWriter); } @Override diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index df8dc7b87..0dd7baf2f 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -46,7 +46,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter); + false, streamWriter, recordWriter); Transaction noAckTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false); @@ -80,7 +80,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { Ack ack = new Ack(singletonList(messageId)); SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter); + false, streamWriter, recordWriter); Transaction ackTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false); From 847650f280360d9f6bc3974f732cd30309748075 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 10 Jun 2021 17:04:15 +0100 Subject: [PATCH 3/8] Replace inner classes with lambdas. --- .../bramble/sync/SimplexOutgoingSession.java | 240 +++++++----------- 1 file changed, 92 insertions(+), 148 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index d069d38b0..f5c7fcbb6 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -99,9 +99,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener { // Send our supported protocol versions recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); // Start a query for each type of record - dbExecutor.execute(new GenerateAck()); - if (eager) dbExecutor.execute(new LoadUnackedMessageIds()); - else dbExecutor.execute(new GenerateBatch()); + dbExecutor.execute(this::generateAck); + if (eager) dbExecutor.execute(this::loadUnackedMessageIds); + else dbExecutor.execute(this::generateBatch); // Write records until interrupted or no more records to write try { while (!interrupted) { @@ -146,166 +146,110 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } } - private class LoadUnackedMessageIds implements Runnable { - - @DatabaseExecutor - @Override - public void run() { - if (interrupted) return; - try { - Map ids = - db.transactionWithResult(true, txn -> - db.getUnackedMessagesToSend(txn, contactId)); - if (LOG.isLoggable(INFO)) { - LOG.info(ids.size() + " unacked messages to send"); - } - if (ids.isEmpty()) decrementOutstandingQueries(); - else dbExecutor.execute(new GenerateEagerBatch(ids)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); + @DatabaseExecutor + private void loadUnackedMessageIds() { + if (interrupted) return; + try { + Map ids = db.transactionWithResult(true, txn -> + db.getUnackedMessagesToSend(txn, contactId)); + if (LOG.isLoggable(INFO)) { + LOG.info(ids.size() + " unacked messages to send"); } - } - } - - private class GenerateEagerBatch implements Runnable { - - private final Map ids; - - private GenerateEagerBatch(Map ids) { - this.ids = ids; - } - - @DatabaseExecutor - @Override - public void run() { - if (interrupted) return; - // Take some message IDs from `ids` to form a batch - Collection batchIds = new ArrayList<>(); - long totalLength = 0; - Iterator> it = - ids.entrySet().iterator(); - while (it.hasNext()) { - // Check whether the next message will fit in the batch - Entry e = it.next(); - int length = e.getValue(); - if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break; - // Add the message to the batch - it.remove(); - batchIds.add(e.getKey()); - totalLength += length; - } - if (batchIds.isEmpty()) throw new AssertionError(); - try { - Collection batch = - db.transactionWithResult(false, txn -> - db.generateBatch(txn, contactId, batchIds, - maxLatency)); - writerTasks.add(new WriteEagerBatch(batch, ids)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); - } - } - } - - private class WriteEagerBatch implements ThrowingRunnable { - - private final Collection batch; - private final Map ids; - - private WriteEagerBatch(Collection batch, - Map ids) { - this.batch = batch; - this.ids = ids; - } - - @IoExecutor - @Override - public void run() throws IOException { - if (interrupted) return; - for (Message m : batch) recordWriter.writeMessage(m); - LOG.info("Sent eager batch"); if (ids.isEmpty()) decrementOutstandingQueries(); - else dbExecutor.execute(new GenerateEagerBatch(ids)); + else dbExecutor.execute(() -> generateEagerBatch(ids)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); } } - private class GenerateAck implements Runnable { - - @DatabaseExecutor - @Override - public void run() { - if (interrupted) return; - try { - Ack a = db.transactionWithNullableResult(false, txn -> - db.generateAck(txn, contactId, MAX_MESSAGE_IDS)); - if (LOG.isLoggable(INFO)) - LOG.info("Generated ack: " + (a != null)); - if (a == null) decrementOutstandingQueries(); - else writerTasks.add(new WriteAck(a)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); - } + @DatabaseExecutor + private void generateEagerBatch(Map ids) { + if (interrupted) return; + // Take some message IDs from `ids` to form a batch + Collection batchIds = new ArrayList<>(); + long totalLength = 0; + Iterator> it = ids.entrySet().iterator(); + while (it.hasNext()) { + // Check whether the next message will fit in the batch + Entry e = it.next(); + int length = e.getValue(); + if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break; + // Add the message to the batch + it.remove(); + batchIds.add(e.getKey()); + totalLength += length; + } + if (batchIds.isEmpty()) throw new AssertionError(); + try { + Collection batch = + db.transactionWithResult(false, txn -> + db.generateBatch(txn, contactId, batchIds, + maxLatency)); + writerTasks.add(() -> writeEagerBatch(batch, ids)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); } } - private class WriteAck implements ThrowingRunnable { + @IoExecutor + private void writeEagerBatch(Collection batch, + Map ids) throws IOException { + if (interrupted) return; + for (Message m : batch) recordWriter.writeMessage(m); + LOG.info("Sent eager batch"); + if (ids.isEmpty()) decrementOutstandingQueries(); + else dbExecutor.execute(() -> generateEagerBatch(ids)); + } - private final Ack ack; - - private WriteAck(Ack ack) { - this.ack = ack; - } - - @IoExecutor - @Override - public void run() throws IOException { - if (interrupted) return; - recordWriter.writeAck(ack); - LOG.info("Sent ack"); - dbExecutor.execute(new GenerateAck()); + @DatabaseExecutor + private void generateAck() { + if (interrupted) return; + try { + Ack a = db.transactionWithNullableResult(false, txn -> + db.generateAck(txn, contactId, MAX_MESSAGE_IDS)); + if (LOG.isLoggable(INFO)) + LOG.info("Generated ack: " + (a != null)); + if (a == null) decrementOutstandingQueries(); + else writerTasks.add(() -> writeAck(a)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); } } - private class GenerateBatch implements Runnable { + @IoExecutor + private void writeAck(Ack ack) throws IOException { + if (interrupted) return; + recordWriter.writeAck(ack); + LOG.info("Sent ack"); + dbExecutor.execute(this::generateAck); + } - @DatabaseExecutor - @Override - public void run() { - if (interrupted) return; - try { - Collection b = - db.transactionWithNullableResult(false, txn -> - db.generateBatch(txn, contactId, - MAX_RECORD_PAYLOAD_BYTES, maxLatency)); - if (LOG.isLoggable(INFO)) - LOG.info("Generated batch: " + (b != null)); - if (b == null) decrementOutstandingQueries(); - else writerTasks.add(new WriteBatch(b)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); - } + @DatabaseExecutor + private void generateBatch() { + if (interrupted) return; + try { + Collection b = + db.transactionWithNullableResult(false, txn -> + db.generateBatch(txn, contactId, + MAX_RECORD_PAYLOAD_BYTES, maxLatency)); + if (LOG.isLoggable(INFO)) + LOG.info("Generated batch: " + (b != null)); + if (b == null) decrementOutstandingQueries(); + else writerTasks.add(() -> writeBatch(b)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); } } - private class WriteBatch implements ThrowingRunnable { - - private final Collection batch; - - private WriteBatch(Collection batch) { - this.batch = batch; - } - - @IoExecutor - @Override - public void run() throws IOException { - if (interrupted) return; - for (Message m : batch) recordWriter.writeMessage(m); - LOG.info("Sent batch"); - dbExecutor.execute(new GenerateBatch()); - } + @IoExecutor + private void writeBatch(Collection batch) throws IOException { + if (interrupted) return; + for (Message m : batch) recordWriter.writeMessage(m); + LOG.info("Sent batch"); + dbExecutor.execute(this::generateBatch); } } From a960bfb2c164de2a9ec897d575c4fa917ac37cc9 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 10 Jun 2021 17:28:30 +0100 Subject: [PATCH 4/8] Add tests for eager retransmission. --- .../sync/SimplexOutgoingSessionTest.java | 120 ++++++++++++++++-- 1 file changed, 111 insertions(+), 9 deletions(-) diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index 0dd7baf2f..9bcf7cc3f 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -17,9 +17,14 @@ import org.briarproject.bramble.test.DbExpectations; import org.briarproject.bramble.test.ImmediateExecutor; import org.junit.Test; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.concurrent.Executor; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; +import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getMessage; @@ -39,14 +44,19 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { private final Executor dbExecutor = new ImmediateExecutor(); private final ContactId contactId = getContactId(); private final TransportId transportId = getTransportId(); - private final Message message = getMessage(new GroupId(getRandomId())); - private final MessageId messageId = message.getId(); + private final Ack ack = + new Ack(singletonList(new MessageId(getRandomId()))); + private final Message message = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); + private final Message message1 = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); @Test public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, false, streamWriter, recordWriter); + Transaction noAckTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false); @@ -63,8 +73,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { // No messages to send oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); - oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + oneOf(db).generateBatch(noMsgTxn, contactId, + MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); @@ -75,12 +85,45 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { session.run(); } + @Test + public void testNothingToSendEagerly() throws Exception { + SimplexOutgoingSession session = new SimplexOutgoingSession(db, + dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + true, streamWriter, recordWriter); + + Transaction noAckTxn = new Transaction(null, false); + Transaction noIdsTxn = new Transaction(null, true); + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // No acks to send + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); + // No messages to send + oneOf(db).transactionWithResult(with(true), + withDbCallable(noIdsTxn)); + oneOf(db).getUnackedMessagesToSend(noIdsTxn, contactId); + will(returnValue(emptyMap())); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } + @Test public void testSomethingToSend() throws Exception { - Ack ack = new Ack(singletonList(messageId)); SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, false, streamWriter, recordWriter); + Transaction ackTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false); @@ -100,8 +143,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { // One message to send oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(msgTxn)); - oneOf(db).generateBatch(with(msgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + oneOf(db).generateBatch(msgTxn, contactId, + MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(singletonList(message))); oneOf(recordWriter).writeMessage(message); // No more acks @@ -112,8 +155,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { // No more messages oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); - oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + oneOf(db).generateBatch(noMsgTxn, contactId, + MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); @@ -123,4 +166,63 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { session.run(); } + + @Test + public void testSomethingToSendEagerly() throws Exception { + SimplexOutgoingSession session = new SimplexOutgoingSession(db, + dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + true, streamWriter, recordWriter); + + Map unacked = new LinkedHashMap<>(); + unacked.put(message.getId(), message.getRawLength()); + unacked.put(message1.getId(), message1.getRawLength()); + + Transaction ackTxn = new Transaction(null, false); + Transaction noAckTxn = new Transaction(null, false); + Transaction idsTxn = new Transaction(null, true); + Transaction msgTxn = new Transaction(null, false); + Transaction msgTxn1 = new Transaction(null, false); + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // One ack to send + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(ackTxn)); + oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(ack)); + oneOf(recordWriter).writeAck(ack); + // No more acks + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); + // Two messages to send + oneOf(db).transactionWithResult(with(true), withDbCallable(idsTxn)); + oneOf(db).getUnackedMessagesToSend(idsTxn, contactId); + will(returnValue(unacked)); + // Send the first message + oneOf(db).transactionWithResult(with(false), + withDbCallable(msgTxn)); + oneOf(db).generateBatch(msgTxn, contactId, + singletonList(message.getId()), MAX_LATENCY); + will(returnValue(singletonList(message))); + oneOf(recordWriter).writeMessage(message); + // Send the second message + oneOf(db).transactionWithResult(with(false), + withDbCallable(msgTxn1)); + oneOf(db).generateBatch(msgTxn1, contactId, + singletonList(message1.getId()), MAX_LATENCY); + will(returnValue(singletonList(message1))); + oneOf(recordWriter).writeMessage(message1); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } } From a5ce40034185873e4ea3b762e1781bc172787db6 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 10 Jun 2021 17:42:19 +0100 Subject: [PATCH 5/8] Use eager retransmission if the transport is lossy and cheap. --- .../bramble/api/plugin/TransportConnectionWriter.java | 5 +++++ .../duplex/AbstractDuplexTransportConnection.java | 5 +++++ .../bramble/api/plugin/simplex/SimplexPlugin.java | 6 ++++++ .../bramble/api/sync/SyncSessionFactory.java | 2 +- .../connection/OutgoingSimplexSyncConnection.java | 4 +++- .../plugin/file/AbstractRemovableDrivePlugin.java | 5 +++++ .../bramble/plugin/file/FileTransportWriter.java | 5 +++++ .../bramble/plugin/file/RemovableDriveWriterTask.java | 5 +++++ .../plugin/file/TransportOutputStreamWriter.java | 11 ++++++++--- .../bramble/sync/SyncSessionFactoryImpl.java | 4 ++-- .../bramble/test/TestTransportConnectionWriter.java | 5 +++++ 11 files changed, 50 insertions(+), 7 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/TransportConnectionWriter.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/TransportConnectionWriter.java index 219f33efe..4ed5ba1ed 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/TransportConnectionWriter.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/TransportConnectionWriter.java @@ -22,6 +22,11 @@ public interface TransportConnectionWriter { */ int getMaxIdleTime(); + /** + * Returns true if the transport is lossy and cheap. + */ + boolean isLossyAndCheap(); + /** * Returns an output stream for writing to the transport connection. */ diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/AbstractDuplexTransportConnection.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/AbstractDuplexTransportConnection.java index 27aad596a..64a13f5e1 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/AbstractDuplexTransportConnection.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/AbstractDuplexTransportConnection.java @@ -79,6 +79,11 @@ public abstract class AbstractDuplexTransportConnection return plugin.getMaxIdleTime(); } + @Override + public boolean isLossyAndCheap() { + return false; + } + @Override public OutputStream getOutputStream() throws IOException { return AbstractDuplexTransportConnection.this.getOutputStream(); diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/simplex/SimplexPlugin.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/simplex/SimplexPlugin.java index 9df61968d..f7cf1e801 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/simplex/SimplexPlugin.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/simplex/SimplexPlugin.java @@ -15,6 +15,12 @@ import javax.annotation.Nullable; @NotNullByDefault public interface SimplexPlugin extends Plugin { + /** + * Returns true if the transport is likely to lose streams and the cost of + * transmitting redundant copies of data is cheap. + */ + boolean isLossyAndCheap(); + /** * Attempts to create and return a reader for the given transport * properties. Returns null if a reader cannot be created. diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java index a19e211fb..e863089ff 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java @@ -16,7 +16,7 @@ public interface SyncSessionFactory { PriorityHandler handler); SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, - int maxLatency, StreamWriter streamWriter); + int maxLatency, boolean eager, StreamWriter streamWriter); SyncSession createDuplexOutgoingSession(ContactId c, TransportId t, int maxLatency, int maxIdleTime, StreamWriter streamWriter, diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java index a5ad6dfc8..9bec08193 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java @@ -71,8 +71,10 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); ContactId c = requireNonNull(ctx.getContactId()); + // Use eager retransmission if the transport is lossy and cheap return syncSessionFactory.createSimplexOutgoingSession(c, - ctx.getTransportId(), w.getMaxLatency(), streamWriter); + ctx.getTransportId(), w.getMaxLatency(), w.isLossyAndCheap(), + streamWriter); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/AbstractRemovableDrivePlugin.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/AbstractRemovableDrivePlugin.java index 356186355..8e8d35488 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/AbstractRemovableDrivePlugin.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/AbstractRemovableDrivePlugin.java @@ -92,6 +92,11 @@ abstract class AbstractRemovableDrivePlugin implements SimplexPlugin { throw new UnsupportedOperationException(); } + @Override + public boolean isLossyAndCheap() { + return true; + } + @Override public TransportConnectionReader createReader(TransportProperties p) { try { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/FileTransportWriter.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/FileTransportWriter.java index 2ab164784..c1d9c6748 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/FileTransportWriter.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/FileTransportWriter.java @@ -36,6 +36,11 @@ class FileTransportWriter implements TransportConnectionWriter { return plugin.getMaxIdleTime(); } + @Override + public boolean isLossyAndCheap() { + return plugin.isLossyAndCheap(); + } + @Override public OutputStream getOutputStream() { return out; diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java index 26f5c4969..563f306fa 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java @@ -106,6 +106,11 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl return delegate.getMaxIdleTime(); } + @Override + public boolean isLossyAndCheap() { + return delegate.isLossyAndCheap(); + } + @Override public OutputStream getOutputStream() throws IOException { return delegate.getOutputStream(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/TransportOutputStreamWriter.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/TransportOutputStreamWriter.java index be40fbca0..db7ba5555 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/TransportOutputStreamWriter.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/TransportOutputStreamWriter.java @@ -1,8 +1,8 @@ package org.briarproject.bramble.plugin.file; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.plugin.Plugin; import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; import java.io.OutputStream; import java.util.logging.Logger; @@ -17,10 +17,10 @@ class TransportOutputStreamWriter implements TransportConnectionWriter { private static final Logger LOG = getLogger(TransportOutputStreamWriter.class.getName()); - private final Plugin plugin; + private final SimplexPlugin plugin; private final OutputStream out; - TransportOutputStreamWriter(Plugin plugin, OutputStream out) { + TransportOutputStreamWriter(SimplexPlugin plugin, OutputStream out) { this.plugin = plugin; this.out = out; } @@ -35,6 +35,11 @@ class TransportOutputStreamWriter implements TransportConnectionWriter { return plugin.getMaxIdleTime(); } + @Override + public boolean isLossyAndCheap() { + return plugin.isLossyAndCheap(); + } + @Override public OutputStream getOutputStream() { return out; diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index 74ec3e51a..37fe1f9d1 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -60,12 +60,12 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { @Override public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, - int maxLatency, StreamWriter streamWriter) { + int maxLatency, boolean eager, StreamWriter streamWriter) { OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t, - maxLatency, false, streamWriter, recordWriter); + maxLatency, eager, streamWriter, recordWriter); } @Override diff --git a/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java b/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java index 238208207..bf382e8e5 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java @@ -35,6 +35,11 @@ public class TestTransportConnectionWriter return 60_000; } + @Override + public boolean isLossyAndCheap() { + return false; + } + @Override public OutputStream getOutputStream() { return out; From 32e9bf01ec29b2c6e1d9e641d202e229b02a133a Mon Sep 17 00:00:00 2001 From: akwizgran Date: Wed, 16 Jun 2021 11:35:29 +0100 Subject: [PATCH 6/8] Update DB method that gets total size of messages to send. --- .../bramble/api/db/DatabaseComponent.java | 20 +++---- .../org/briarproject/bramble/db/Database.java | 19 +++---- .../bramble/db/DatabaseComponentImpl.java | 18 +++--- .../briarproject/bramble/db/JdbcDatabase.java | 57 +++++++++---------- .../plugin/file/RemovableDriveWriterTask.java | 3 +- .../bramble/db/DatabaseComponentImplTest.java | 2 +- .../bramble/db/JdbcDatabaseTest.java | 30 +++++----- 7 files changed, 71 insertions(+), 78 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index 53ed7416a..0d055641a 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -314,16 +314,6 @@ public interface DatabaseComponent extends TransactionManager { */ Message getMessage(Transaction txn, MessageId m) throws DbException; - /** - * Returns the total length, including headers, of any messages that are - * eligible to be sent to the given contact via a transport with the given - * max latency. - *

- * Read-only. - */ - long getMessageBytesToSend(Transaction txn, ContactId c, int maxLatency) - throws DbException; - /** * Returns the IDs of all delivered messages in the given group. *

@@ -469,6 +459,16 @@ public interface DatabaseComponent extends TransactionManager { Map getUnackedMessagesToSend(Transaction txn, ContactId c) throws DbException; + /** + * Returns the total length, including headers, of all messages that are + * eligible to be sent to the given contact. This may include messages + * that have already been sent and are not yet due for retransmission. + *

+ * Read-only. + */ + long getUnackedMessageBytesToSend(Transaction txn, ContactId c) + throws DbException; + /** * Returns the next time (in milliseconds since the Unix epoch) when a * message is due to be deleted, or {@link #NO_CLEANUP_DEADLINE} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index cb386ab5c..64f54f19b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -357,16 +357,6 @@ interface Database { */ Message getMessage(T txn, MessageId m) throws DbException; - /** - * Returns the total length, including headers, of any messages that are - * eligible to be sent to the given contact via a transport with the given - * max latency. - *

- * Read-only. - */ - long getMessageBytesToSend(T txn, ContactId c, int maxLatency) - throws DbException; - /** * Returns the IDs and states of all dependencies of the given message. * For missing dependencies and dependencies in other groups, the state @@ -518,6 +508,15 @@ interface Database { Map getUnackedMessagesToSend(T txn, ContactId c) throws DbException; + /** + * Returns the total length, including headers, of all messages that are + * eligible to be sent to the given contact. This may include messages + * that have already been sent and are not yet due for retransmission. + *

+ * Read-only. + */ + long getUnackedMessageBytesToSend(T txn, ContactId c) throws DbException; + /** * Returns the IDs of any messages that need to be validated. *

diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index d4c8ef682..c92245acd 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -608,15 +608,6 @@ class DatabaseComponentImpl implements DatabaseComponent { return db.getMessage(txn, m); } - @Override - public long getMessageBytesToSend(Transaction transaction, ContactId c, - int maxLatency) throws DbException { - T txn = unbox(transaction); - if (!db.containsContact(txn, c)) - throw new NoSuchContactException(); - return db.getMessageBytesToSend(txn, c, maxLatency); - } - @Override public Collection getMessageIds(Transaction transaction, GroupId g) throws DbException { @@ -750,6 +741,15 @@ class DatabaseComponentImpl implements DatabaseComponent { return db.getUnackedMessagesToSend(txn, c); } + @Override + public long getUnackedMessageBytesToSend(Transaction transaction, + ContactId c) throws DbException { + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + return db.getUnackedMessageBytesToSend(txn, c); + } + @Override public Map getMessageDependencies( Transaction transaction, MessageId m) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index 32212193b..c77010b82 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -1788,37 +1788,6 @@ abstract class JdbcDatabase implements Database { } } - @Override - public long getMessageBytesToSend(Connection txn, ContactId c, - int maxLatency) throws DbException { - long now = clock.currentTimeMillis(); - long eta = now + maxLatency; - PreparedStatement ps = null; - ResultSet rs = null; - try { - String sql = "SELECT SUM(length) FROM statuses" - + " WHERE contactId = ? AND state = ?" - + " AND groupShared = TRUE AND messageShared = TRUE" - + " AND deleted = FALSE AND seen = FALSE" - + " AND (expiry <= ? OR eta > ?)"; - ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - ps.setInt(2, DELIVERED.getValue()); - ps.setLong(3, now); - ps.setLong(4, eta); - rs = ps.executeQuery(); - rs.next(); - long total = rs.getInt(1); - rs.close(); - ps.close(); - return total; - } catch (SQLException e) { - tryToClose(rs, LOG, WARNING); - tryToClose(ps, LOG, WARNING); - throw new DbException(e); - } - } - @Override public Collection getMessageIds(Connection txn, GroupId g) throws DbException { @@ -2297,6 +2266,32 @@ abstract class JdbcDatabase implements Database { } } + @Override + public long getUnackedMessageBytesToSend(Connection txn, ContactId c) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT SUM(length) FROM statuses" + + " WHERE contactId = ? AND state = ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE AND seen = FALSE"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + rs = ps.executeQuery(); + rs.next(); + long total = rs.getInt(1); + rs.close(); + ps.close(); + return total; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public Collection getMessagesToValidate(Connection txn) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java index 563f306fa..1055f9de9 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java @@ -60,10 +60,9 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl setSuccess(false); return; } - int maxLatency = plugin.getMaxLatency(); try { setTotal(db.transactionWithResult(true, txn -> - db.getMessageBytesToSend(txn, contactId, maxLatency))); + db.getUnackedMessageBytesToSend(txn, contactId))); } catch (DbException e) { logException(LOG, WARNING, e); registry.removeWriter(this); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java index 4b53a781d..a46f893ea 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java @@ -358,7 +358,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { try { db.transaction(true, transaction -> - db.getMessageBytesToSend(transaction, contactId, 123)); + db.getUnackedMessageBytesToSend(transaction, contactId)); fail(); } catch (NoSuchContactException expected) { // Expected diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java index 2da33c18d..db6b3411b 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java @@ -228,7 +228,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); // Changing the status to seen = true should make the message unsendable db.raiseSeenFlag(txn, contactId, messageId); @@ -236,7 +236,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); @@ -261,7 +261,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Marking the message delivered should make it sendable db.setMessageState(txn, messageId, DELIVERED); @@ -270,7 +270,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); // Marking the message invalid should make it unsendable db.setMessageState(txn, messageId, INVALID); @@ -278,7 +278,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Marking the message pending should make it unsendable db.setMessageState(txn, messageId, PENDING); @@ -286,7 +286,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); @@ -310,7 +310,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Making the group visible should not make the message sendable db.addGroupVisibility(txn, contactId, groupId, false); @@ -318,7 +318,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Sharing the group should make the message sendable db.setGroupVisibility(txn, contactId, groupId, true); @@ -327,7 +327,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); // Unsharing the group should make the message unsendable db.setGroupVisibility(txn, contactId, groupId, false); @@ -335,7 +335,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Making the group invisible should make the message unsendable db.removeGroupVisibility(txn, contactId, groupId); @@ -343,7 +343,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); @@ -368,7 +368,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Sharing the message should make it sendable db.setMessageShared(txn, messageId, true); @@ -377,7 +377,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); @@ -402,14 +402,14 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { MAX_LATENCY); assertTrue(ids.isEmpty()); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); // The message is just the right size to send ids = db.getMessagesToSend(txn, contactId, message.getRawLength(), MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); From d5853e84036842e24b2c17c54ac01fe69bc9c5f7 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Wed, 16 Jun 2021 12:26:54 +0100 Subject: [PATCH 7/8] Add integration test for eager retransmission. --- .../test/TestDuplexTransportConnection.java | 2 +- .../test/TestTransportConnectionWriter.java | 7 +- .../SimplexMessagingIntegrationTest.java | 64 ++++++++++++++++--- .../briar/test/BriarIntegrationTest.java | 4 +- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java b/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java index 0ed6f9045..83ec8129e 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java @@ -25,7 +25,7 @@ public class TestDuplexTransportConnection @SuppressWarnings("WeakerAccess") public TestDuplexTransportConnection(InputStream in, OutputStream out) { reader = new TestTransportConnectionReader(in); - writer = new TestTransportConnectionWriter(out); + writer = new TestTransportConnectionWriter(out, false); } @Override diff --git a/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java b/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java index bf382e8e5..44e570fd7 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java @@ -15,10 +15,13 @@ public class TestTransportConnectionWriter implements TransportConnectionWriter { private final OutputStream out; + private final boolean lossyAndCheap; private final CountDownLatch disposed = new CountDownLatch(1); - public TestTransportConnectionWriter(OutputStream out) { + public TestTransportConnectionWriter(OutputStream out, + boolean lossyAndCheap) { this.out = out; + this.lossyAndCheap = lossyAndCheap; } public CountDownLatch getDisposedLatch() { @@ -37,7 +40,7 @@ public class TestTransportConnectionWriter @Override public boolean isLossyAndCheap() { - return false; + return lossyAndCheap; } @Override diff --git a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java index 16e540e95..aedefb63a 100644 --- a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java +++ b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java @@ -11,7 +11,9 @@ import org.briarproject.bramble.api.identity.IdentityManager; import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.GroupId; +import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent; +import org.briarproject.bramble.api.sync.event.MessagesSentEvent; import org.briarproject.bramble.test.TestDatabaseConfigModule; import org.briarproject.bramble.test.TestTransportConnectionReader; import org.briarproject.bramble.test.TestTransportConnectionWriter; @@ -71,7 +73,16 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { } @Test - public void testWriteAndRead() throws Exception { + public void testWriteAndReadWithLazyRetransmission() throws Exception { + testWriteAndRead(false); + } + + @Test + public void testWriteAndReadWithEagerRetransmission() throws Exception { + testWriteAndRead(true); + } + + private void testWriteAndRead(boolean eager) throws Exception { // Create the identities Identity aliceIdentity = alice.getIdentityManager().createIdentity("Alice"); @@ -86,16 +97,20 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { bob.getEventBus().addListener(listener); // Alice sends a private message to Bob sendMessage(alice, bobId); - // Sync Alice's client versions and transport properties - read(bob, write(alice, bobId), 2); - // Sync Bob's client versions and transport properties - read(alice, write(bob, aliceId), 2); - // Sync the private message and the attachment - read(bob, write(alice, bobId), 2); + // Sync Alice's client versions + read(bob, write(alice, bobId, eager, 1), 1); + // Sync Bob's client versions + read(alice, write(bob, aliceId, eager, 1), 1); + // Sync Alice's client versions, the private message and the attachment + read(bob, write(alice, bobId, eager, 3), 3); // Bob should have received the private message assertTrue(listener.messageAdded); // Bob should have received the attachment assertTrue(listener.attachmentAdded); + // Sync messages from Alice to Bob again. If using eager + // retransmission, the three unacked messages should be sent again. + // They're all duplicates, so no further deliveries should occur + read(bob, write(alice, bobId, eager, eager ? 3 : 0), 0); } private ContactId setUp(SimplexMessagingIntegrationTestComponent device, @@ -149,15 +164,24 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { } private byte[] write(SimplexMessagingIntegrationTestComponent device, - ContactId contactId) throws Exception { + ContactId contactId, boolean eager, int transmissions) + throws Exception { + // Listen for message transmissions + MessageTransmissionListener listener = + new MessageTransmissionListener(transmissions); + device.getEventBus().addListener(listener); // Write the outgoing stream ByteArrayOutputStream out = new ByteArrayOutputStream(); TestTransportConnectionWriter writer = - new TestTransportConnectionWriter(out); + new TestTransportConnectionWriter(out, eager); device.getConnectionManager().manageOutgoingConnection(contactId, SIMPLEX_TRANSPORT_ID, writer); // Wait for the writer to be disposed writer.getDisposedLatch().await(TIMEOUT_MS, MILLISECONDS); + // Check that the expected number of messages were sent + assertTrue(listener.sent.await(TIMEOUT_MS, MILLISECONDS)); + // Clean up the listener + device.getEventBus().removeListener(listener); // Return the contents of the stream return out.toByteArray(); } @@ -178,6 +202,24 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { deleteTestDirectory(testDir); } + @NotNullByDefault + private static class MessageTransmissionListener implements EventListener { + + private final CountDownLatch sent; + + private MessageTransmissionListener(int transmissions) { + sent = new CountDownLatch(transmissions); + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof MessagesSentEvent) { + MessagesSentEvent m = (MessagesSentEvent) e; + for (MessageId ignored : m.getMessageIds()) sent.countDown(); + } + } + } + @NotNullByDefault private static class MessageDeliveryListener implements EventListener { @@ -191,7 +233,9 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { public void eventOccurred(Event e) { if (e instanceof MessageStateChangedEvent) { MessageStateChangedEvent m = (MessageStateChangedEvent) e; - if (m.getState().equals(DELIVERED)) delivered.countDown(); + if (!m.isLocal() && m.getState().equals(DELIVERED)) { + delivered.countDown(); + } } } } diff --git a/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java b/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java index 0bcdab2a9..e493b4fa5 100644 --- a/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java +++ b/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java @@ -429,7 +429,7 @@ public abstract class BriarIntegrationTest Date: Wed, 16 Jun 2021 16:26:29 +0100 Subject: [PATCH 8/8] Add comment explaining second client versioning message. --- .../briar/messaging/SimplexMessagingIntegrationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java index aedefb63a..2ac9fff1b 100644 --- a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java +++ b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java @@ -101,7 +101,8 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { read(bob, write(alice, bobId, eager, 1), 1); // Sync Bob's client versions read(alice, write(bob, aliceId, eager, 1), 1); - // Sync Alice's client versions, the private message and the attachment + // Sync Alice's second client versioning update (with the active flag + // raised), the private message and the attachment read(bob, write(alice, bobId, eager, 3), 3); // Bob should have received the private message assertTrue(listener.messageAdded);