diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
index f68d666d6..ff43c7272 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
@@ -349,13 +349,13 @@ public interface DatabaseComponent extends TransactionManager {
Metadata query) throws DbException;
/**
- * Returns the IDs of some messages received from the given contact that
- * need to be acknowledged, up to the given number of messages.
+ * Returns the IDs of all messages received from the given contact that
+ * need to be acknowledged.
*
* Read-only.
*/
- Collection getMessagesToAck(Transaction txn, ContactId c,
- int maxMessages) throws DbException;
+ Collection getMessagesToAck(Transaction txn, ContactId c)
+ throws DbException;
/**
* Returns the IDs of some messages that are eligible to be sent to the
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
index 01c1497fe..ae8b9df26 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
@@ -620,11 +620,11 @@ class DatabaseComponentImpl implements DatabaseComponent {
@Override
public Collection getMessagesToAck(Transaction transaction,
- ContactId c, int maxMessages) throws DbException {
+ ContactId c) throws DbException {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
- return db.getMessagesToAck(txn, c, maxMessages);
+ return db.getMessagesToAck(txn, c, Integer.MAX_VALUE);
}
@Override
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java
index 3dcf95a3a..afe7a72a8 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java
@@ -14,7 +14,9 @@ import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
@@ -61,26 +63,32 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
@Override
void sendAcks() throws DbException, IOException {
- while (!isInterrupted()) {
- Collection idsToAck = loadMessageIdsToAck();
- if (idsToAck.isEmpty()) break;
- recordWriter.writeAck(new Ack(idsToAck));
- sessionRecord.onAckSent(idsToAck);
+ List idsToAck = loadMessageIdsToAck();
+ int idsSent = 0;
+ while (idsSent < idsToAck.size() && !isInterrupted()) {
+ int idsRemaining = idsToAck.size() - idsSent;
+ long capacity = getRemainingCapacity();
+ long idCapacity =
+ (capacity - RECORD_HEADER_BYTES) / MessageId.LENGTH;
+ if (idCapacity == 0) break; // Out of capacity
+ int idsInRecord = (int) min(idCapacity, MAX_MESSAGE_IDS);
+ int idsToSend = min(idsRemaining, idsInRecord);
+ List acked =
+ idsToAck.subList(idsSent, idsSent + idsToSend);
+ recordWriter.writeAck(new Ack(acked));
+ sessionRecord.onAckSent(acked);
LOG.info("Sent ack");
+ idsSent += idsToSend;
}
}
- private Collection loadMessageIdsToAck() throws DbException {
- long idCapacity = (getRemainingCapacity() - RECORD_HEADER_BYTES)
- / MessageId.LENGTH;
- if (idCapacity <= 0) return emptyList(); // Out of capacity
- int maxMessageIds = (int) min(idCapacity, MAX_MESSAGE_IDS);
+ private List loadMessageIdsToAck() throws DbException {
Collection ids = db.transactionWithResult(true, txn ->
- db.getMessagesToAck(txn, contactId, maxMessageIds));
+ db.getMessagesToAck(txn, contactId));
if (LOG.isLoggable(INFO)) {
LOG.info(ids.size() + " messages to ack");
}
- return ids;
+ return new ArrayList<>(ids);
}
private long getRemainingCapacity() {
diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java
index 2e342186a..25e85afc5 100644
--- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java
+++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java
@@ -389,7 +389,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
try {
db.transaction(true, transaction ->
- db.getMessagesToAck(transaction, contactId, 123));
+ db.getMessagesToAck(transaction, contactId));
fail();
} catch (NoSuchContactException expected) {
// Expected
diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java
index 814599358..a623cc8af 100644
--- a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java
+++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java
@@ -14,11 +14,13 @@ import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.test.BrambleMockTestCase;
+import org.briarproject.bramble.test.CaptureArgumentAction;
import org.briarproject.bramble.test.DbExpectations;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@@ -68,13 +70,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
oneOf(eventBus).addListener(session);
// Send the protocol versions
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
- // Calculate capacity for acks
- oneOf(recordWriter).getBytesWritten();
- will(returnValue((long) versionRecordBytes));
// No messages to ack
oneOf(db).transactionWithResult(with(true),
withDbCallable(noAckIdTxn));
- oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS);
+ oneOf(db).getMessagesToAck(noAckIdTxn, contactId);
will(returnValue(emptyList()));
// Calculate capacity for messages
oneOf(recordWriter).getBytesWritten();
@@ -106,7 +105,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
MAX_FILE_PAYLOAD_BYTES);
Transaction ackIdTxn = new Transaction(null, true);
- Transaction noAckIdTxn = new Transaction(null, true);
Transaction msgIdTxn = new Transaction(null, true);
Transaction msgTxn = new Transaction(null, true);
@@ -114,28 +112,24 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
long capacityForMessages =
MAX_FILE_PAYLOAD_BYTES - versionRecordBytes - ackRecordBytes;
+ AtomicReference ack = new AtomicReference<>();
+
context.checking(new DbExpectations() {{
// Add listener
oneOf(eventBus).addListener(session);
// Send the protocol versions
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
+ // Load the IDs to ack
+ oneOf(db).transactionWithResult(with(true),
+ withDbCallable(ackIdTxn));
+ oneOf(db).getMessagesToAck(ackIdTxn, contactId);
+ will(returnValue(singletonList(message.getId())));
// Calculate capacity for acks
oneOf(recordWriter).getBytesWritten();
will(returnValue((long) versionRecordBytes));
- // One message to ack
- oneOf(db).transactionWithResult(with(true),
- withDbCallable(ackIdTxn));
- oneOf(db).getMessagesToAck(ackIdTxn, contactId, MAX_MESSAGE_IDS);
- will(returnValue(singletonList(message.getId())));
// Send the ack
- oneOf(recordWriter).getBytesWritten();
- will(returnValue((long) versionRecordBytes));
oneOf(recordWriter).writeAck(with(any(Ack.class)));
- // No more messages to ack
- oneOf(db).transactionWithResult(with(true),
- withDbCallable(noAckIdTxn));
- oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS);
- will(returnValue(emptyList()));
+ will(new CaptureArgumentAction<>(ack, Ack.class, 0));
// Calculate capacity for messages
oneOf(recordWriter).getBytesWritten();
will(returnValue((long) versionRecordBytes + ackRecordBytes));
@@ -162,6 +156,7 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
assertEquals(singletonList(message.getId()),
sessionRecord.getAckedIds());
+ assertEquals(singletonList(message.getId()), ack.get().getMessageIds());
assertEquals(singletonList(message1.getId()),
sessionRecord.getSentIds());
}
@@ -178,48 +173,50 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
eventBus, contactId, transportId, MAX_LATENCY,
streamWriter, recordWriter, sessionRecord, capacity);
- Transaction ackIdTxn1 = new Transaction(null, true);
- Transaction ackIdTxn2 = new Transaction(null, true);
+ Transaction ackIdTxn = new Transaction(null, true);
int firstAckRecordBytes =
RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS;
int secondAckRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH;
- List idsInFirstAck = new ArrayList<>(MAX_MESSAGE_IDS);
- for (int i = 0; i < MAX_MESSAGE_IDS; i++) {
- idsInFirstAck.add(new MessageId(getRandomId()));
+ // There are MAX_MESSAGE_IDS + 2 messages that need to be acked, but
+ // only enough capacity to ack MAX_MESSAGE_IDS + 1 messages
+ List idsToAck = new ArrayList<>(MAX_MESSAGE_IDS + 2);
+ for (int i = 0; i < MAX_MESSAGE_IDS + 2; i++) {
+ idsToAck.add(new MessageId(getRandomId()));
}
+ // The first ack contains MAX_MESSAGE_IDS IDs
+ List idsInFirstAck = idsToAck.subList(0, MAX_MESSAGE_IDS);
+ // The second ack contains one ID
List idsInSecondAck =
- singletonList(new MessageId(getRandomId()));
- List allIds = new ArrayList<>(MAX_MESSAGE_IDS + 1);
- allIds.addAll(idsInFirstAck);
- allIds.addAll(idsInSecondAck);
+ idsToAck.subList(MAX_MESSAGE_IDS, MAX_MESSAGE_IDS + 1);
+ List idsAcked = idsToAck.subList(0, MAX_MESSAGE_IDS + 1);
+
+ AtomicReference firstAck = new AtomicReference<>();
+ AtomicReference secondAck = new AtomicReference<>();
context.checking(new DbExpectations() {{
// Add listener
oneOf(eventBus).addListener(session);
// Send the protocol versions
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
+ // Load the IDs to ack
+ oneOf(db).transactionWithResult(with(true),
+ withDbCallable(ackIdTxn));
+ oneOf(db).getMessagesToAck(ackIdTxn, contactId);
+ will(returnValue(idsToAck));
// Calculate capacity for acks
oneOf(recordWriter).getBytesWritten();
will(returnValue((long) versionRecordBytes));
- // Load the IDs for the first ack record
- oneOf(db).transactionWithResult(with(true),
- withDbCallable(ackIdTxn1));
- oneOf(db).getMessagesToAck(ackIdTxn1, contactId, MAX_MESSAGE_IDS);
- will(returnValue(idsInFirstAck));
// Send the first ack record
oneOf(recordWriter).writeAck(with(any(Ack.class)));
+ will(new CaptureArgumentAction<>(firstAck, Ack.class, 0));
// Calculate remaining capacity for acks
oneOf(recordWriter).getBytesWritten();
will(returnValue((long) versionRecordBytes + firstAckRecordBytes));
- // Load the IDs for the second ack record
- oneOf(db).transactionWithResult(with(true),
- withDbCallable(ackIdTxn2));
- oneOf(db).getMessagesToAck(ackIdTxn2, contactId, 1);
- will(returnValue(idsInSecondAck));
// Send the second ack record
oneOf(recordWriter).writeAck(with(any(Ack.class)));
+ will(new CaptureArgumentAction<>(secondAck, Ack.class, 0));
// Not enough capacity left for another ack
oneOf(recordWriter).getBytesWritten();
will(returnValue((long) versionRecordBytes + firstAckRecordBytes
@@ -236,7 +233,9 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
session.run();
- assertEquals(allIds, sessionRecord.getAckedIds());
+ assertEquals(idsAcked, sessionRecord.getAckedIds());
+ assertEquals(idsInFirstAck, firstAck.get().getMessageIds());
+ assertEquals(idsInSecondAck, secondAck.get().getMessageIds());
assertEquals(emptyList(), sessionRecord.getSentIds());
}
}