From 7a6d07598495a2dea0b8b70587d1575d61bf6aa3 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 4 Aug 2022 15:50:05 +0100 Subject: [PATCH] Don't repeatedly ack the same messages. --- .../bramble/api/db/DatabaseComponent.java | 8 +- .../bramble/db/DatabaseComponentImpl.java | 4 +- .../bramble/sync/MailboxOutgoingSession.java | 32 +++++--- .../bramble/db/DatabaseComponentImplTest.java | 2 +- .../sync/MailboxOutgoingSessionTest.java | 73 +++++++++---------- 5 files changed, 63 insertions(+), 56 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index f68d666d6..ff43c7272 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -349,13 +349,13 @@ public interface DatabaseComponent extends TransactionManager { Metadata query) throws DbException; /** - * Returns the IDs of some messages received from the given contact that - * need to be acknowledged, up to the given number of messages. + * Returns the IDs of all messages received from the given contact that + * need to be acknowledged. *

* Read-only. */ - Collection getMessagesToAck(Transaction txn, ContactId c, - int maxMessages) throws DbException; + Collection getMessagesToAck(Transaction txn, ContactId c) + throws DbException; /** * Returns the IDs of some messages that are eligible to be sent to the diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 01c1497fe..ae8b9df26 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -620,11 +620,11 @@ class DatabaseComponentImpl implements DatabaseComponent { @Override public Collection getMessagesToAck(Transaction transaction, - ContactId c, int maxMessages) throws DbException { + ContactId c) throws DbException { T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); - return db.getMessagesToAck(txn, c, maxMessages); + return db.getMessagesToAck(txn, c, Integer.MAX_VALUE); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java index 3dcf95a3a..afe7a72a8 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java @@ -14,7 +14,9 @@ import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.logging.Logger; import javax.annotation.concurrent.ThreadSafe; @@ -61,26 +63,32 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { @Override void sendAcks() throws DbException, IOException { - while (!isInterrupted()) { - Collection idsToAck = loadMessageIdsToAck(); - if (idsToAck.isEmpty()) break; - recordWriter.writeAck(new Ack(idsToAck)); - sessionRecord.onAckSent(idsToAck); + List idsToAck = loadMessageIdsToAck(); + int idsSent = 0; + while (idsSent < idsToAck.size() && !isInterrupted()) { + int idsRemaining = idsToAck.size() - idsSent; + long capacity = getRemainingCapacity(); + long idCapacity = + (capacity - RECORD_HEADER_BYTES) / MessageId.LENGTH; + if (idCapacity == 0) break; // Out of capacity + int idsInRecord = (int) min(idCapacity, MAX_MESSAGE_IDS); + int idsToSend = min(idsRemaining, idsInRecord); + List acked = + idsToAck.subList(idsSent, idsSent + idsToSend); + recordWriter.writeAck(new Ack(acked)); + sessionRecord.onAckSent(acked); LOG.info("Sent ack"); + idsSent += idsToSend; } } - private Collection loadMessageIdsToAck() throws DbException { - long idCapacity = (getRemainingCapacity() - RECORD_HEADER_BYTES) - / MessageId.LENGTH; - if (idCapacity <= 0) return emptyList(); // Out of capacity - int maxMessageIds = (int) min(idCapacity, MAX_MESSAGE_IDS); + private List loadMessageIdsToAck() throws DbException { Collection ids = db.transactionWithResult(true, txn -> - db.getMessagesToAck(txn, contactId, maxMessageIds)); + db.getMessagesToAck(txn, contactId)); if (LOG.isLoggable(INFO)) { LOG.info(ids.size() + " messages to ack"); } - return ids; + return new ArrayList<>(ids); } private long getRemainingCapacity() { diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java index 2e342186a..25e85afc5 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java @@ -389,7 +389,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { try { db.transaction(true, transaction -> - db.getMessagesToAck(transaction, contactId, 123)); + db.getMessagesToAck(transaction, contactId)); fail(); } catch (NoSuchContactException expected) { // Expected diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java index 814599358..a623cc8af 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java @@ -14,11 +14,13 @@ import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.test.BrambleMockTestCase; +import org.briarproject.bramble.test.CaptureArgumentAction; import org.briarproject.bramble.test.DbExpectations; import org.junit.Test; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -68,13 +70,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { oneOf(eventBus).addListener(session); // Send the protocol versions oneOf(recordWriter).writeVersions(with(any(Versions.class))); - // Calculate capacity for acks - oneOf(recordWriter).getBytesWritten(); - will(returnValue((long) versionRecordBytes)); // No messages to ack oneOf(db).transactionWithResult(with(true), withDbCallable(noAckIdTxn)); - oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS); + oneOf(db).getMessagesToAck(noAckIdTxn, contactId); will(returnValue(emptyList())); // Calculate capacity for messages oneOf(recordWriter).getBytesWritten(); @@ -106,7 +105,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { MAX_FILE_PAYLOAD_BYTES); Transaction ackIdTxn = new Transaction(null, true); - Transaction noAckIdTxn = new Transaction(null, true); Transaction msgIdTxn = new Transaction(null, true); Transaction msgTxn = new Transaction(null, true); @@ -114,28 +112,24 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { long capacityForMessages = MAX_FILE_PAYLOAD_BYTES - versionRecordBytes - ackRecordBytes; + AtomicReference ack = new AtomicReference<>(); + context.checking(new DbExpectations() {{ // Add listener oneOf(eventBus).addListener(session); // Send the protocol versions oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // Load the IDs to ack + oneOf(db).transactionWithResult(with(true), + withDbCallable(ackIdTxn)); + oneOf(db).getMessagesToAck(ackIdTxn, contactId); + will(returnValue(singletonList(message.getId()))); // Calculate capacity for acks oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes)); - // One message to ack - oneOf(db).transactionWithResult(with(true), - withDbCallable(ackIdTxn)); - oneOf(db).getMessagesToAck(ackIdTxn, contactId, MAX_MESSAGE_IDS); - will(returnValue(singletonList(message.getId()))); // Send the ack - oneOf(recordWriter).getBytesWritten(); - will(returnValue((long) versionRecordBytes)); oneOf(recordWriter).writeAck(with(any(Ack.class))); - // No more messages to ack - oneOf(db).transactionWithResult(with(true), - withDbCallable(noAckIdTxn)); - oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS); - will(returnValue(emptyList())); + will(new CaptureArgumentAction<>(ack, Ack.class, 0)); // Calculate capacity for messages oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes + ackRecordBytes)); @@ -162,6 +156,7 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { assertEquals(singletonList(message.getId()), sessionRecord.getAckedIds()); + assertEquals(singletonList(message.getId()), ack.get().getMessageIds()); assertEquals(singletonList(message1.getId()), sessionRecord.getSentIds()); } @@ -178,48 +173,50 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { eventBus, contactId, transportId, MAX_LATENCY, streamWriter, recordWriter, sessionRecord, capacity); - Transaction ackIdTxn1 = new Transaction(null, true); - Transaction ackIdTxn2 = new Transaction(null, true); + Transaction ackIdTxn = new Transaction(null, true); int firstAckRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS; int secondAckRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH; - List idsInFirstAck = new ArrayList<>(MAX_MESSAGE_IDS); - for (int i = 0; i < MAX_MESSAGE_IDS; i++) { - idsInFirstAck.add(new MessageId(getRandomId())); + // There are MAX_MESSAGE_IDS + 2 messages that need to be acked, but + // only enough capacity to ack MAX_MESSAGE_IDS + 1 messages + List idsToAck = new ArrayList<>(MAX_MESSAGE_IDS + 2); + for (int i = 0; i < MAX_MESSAGE_IDS + 2; i++) { + idsToAck.add(new MessageId(getRandomId())); } + // The first ack contains MAX_MESSAGE_IDS IDs + List idsInFirstAck = idsToAck.subList(0, MAX_MESSAGE_IDS); + // The second ack contains one ID List idsInSecondAck = - singletonList(new MessageId(getRandomId())); - List allIds = new ArrayList<>(MAX_MESSAGE_IDS + 1); - allIds.addAll(idsInFirstAck); - allIds.addAll(idsInSecondAck); + idsToAck.subList(MAX_MESSAGE_IDS, MAX_MESSAGE_IDS + 1); + List idsAcked = idsToAck.subList(0, MAX_MESSAGE_IDS + 1); + + AtomicReference firstAck = new AtomicReference<>(); + AtomicReference secondAck = new AtomicReference<>(); context.checking(new DbExpectations() {{ // Add listener oneOf(eventBus).addListener(session); // Send the protocol versions oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // Load the IDs to ack + oneOf(db).transactionWithResult(with(true), + withDbCallable(ackIdTxn)); + oneOf(db).getMessagesToAck(ackIdTxn, contactId); + will(returnValue(idsToAck)); // Calculate capacity for acks oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes)); - // Load the IDs for the first ack record - oneOf(db).transactionWithResult(with(true), - withDbCallable(ackIdTxn1)); - oneOf(db).getMessagesToAck(ackIdTxn1, contactId, MAX_MESSAGE_IDS); - will(returnValue(idsInFirstAck)); // Send the first ack record oneOf(recordWriter).writeAck(with(any(Ack.class))); + will(new CaptureArgumentAction<>(firstAck, Ack.class, 0)); // Calculate remaining capacity for acks oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes + firstAckRecordBytes)); - // Load the IDs for the second ack record - oneOf(db).transactionWithResult(with(true), - withDbCallable(ackIdTxn2)); - oneOf(db).getMessagesToAck(ackIdTxn2, contactId, 1); - will(returnValue(idsInSecondAck)); // Send the second ack record oneOf(recordWriter).writeAck(with(any(Ack.class))); + will(new CaptureArgumentAction<>(secondAck, Ack.class, 0)); // Not enough capacity left for another ack oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes + firstAckRecordBytes @@ -236,7 +233,9 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { session.run(); - assertEquals(allIds, sessionRecord.getAckedIds()); + assertEquals(idsAcked, sessionRecord.getAckedIds()); + assertEquals(idsInFirstAck, firstAck.get().getMessageIds()); + assertEquals(idsInSecondAck, secondAck.get().getMessageIds()); assertEquals(emptyList(), sessionRecord.getSentIds()); } }