mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-18 05:39:53 +01:00
Don't repeatedly ack the same messages.
This commit is contained in:
@@ -349,13 +349,13 @@ public interface DatabaseComponent extends TransactionManager {
|
|||||||
Metadata query) throws DbException;
|
Metadata query) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the IDs of some messages received from the given contact that
|
* Returns the IDs of all messages received from the given contact that
|
||||||
* need to be acknowledged, up to the given number of messages.
|
* need to be acknowledged.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Read-only.
|
* Read-only.
|
||||||
*/
|
*/
|
||||||
Collection<MessageId> getMessagesToAck(Transaction txn, ContactId c,
|
Collection<MessageId> getMessagesToAck(Transaction txn, ContactId c)
|
||||||
int maxMessages) throws DbException;
|
throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the IDs of some messages that are eligible to be sent to the
|
* Returns the IDs of some messages that are eligible to be sent to the
|
||||||
|
|||||||
@@ -620,11 +620,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<MessageId> getMessagesToAck(Transaction transaction,
|
public Collection<MessageId> getMessagesToAck(Transaction transaction,
|
||||||
ContactId c, int maxMessages) throws DbException {
|
ContactId c) throws DbException {
|
||||||
T txn = unbox(transaction);
|
T txn = unbox(transaction);
|
||||||
if (!db.containsContact(txn, c))
|
if (!db.containsContact(txn, c))
|
||||||
throw new NoSuchContactException();
|
throw new NoSuchContactException();
|
||||||
return db.getMessagesToAck(txn, c, maxMessages);
|
return db.getMessagesToAck(txn, c, Integer.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -14,7 +14,9 @@ import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
|||||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
import javax.annotation.concurrent.ThreadSafe;
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
@@ -61,26 +63,32 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void sendAcks() throws DbException, IOException {
|
void sendAcks() throws DbException, IOException {
|
||||||
while (!isInterrupted()) {
|
List<MessageId> idsToAck = loadMessageIdsToAck();
|
||||||
Collection<MessageId> idsToAck = loadMessageIdsToAck();
|
int idsSent = 0;
|
||||||
if (idsToAck.isEmpty()) break;
|
while (idsSent < idsToAck.size() && !isInterrupted()) {
|
||||||
recordWriter.writeAck(new Ack(idsToAck));
|
int idsRemaining = idsToAck.size() - idsSent;
|
||||||
sessionRecord.onAckSent(idsToAck);
|
long capacity = getRemainingCapacity();
|
||||||
|
long idCapacity =
|
||||||
|
(capacity - RECORD_HEADER_BYTES) / MessageId.LENGTH;
|
||||||
|
if (idCapacity == 0) break; // Out of capacity
|
||||||
|
int idsInRecord = (int) min(idCapacity, MAX_MESSAGE_IDS);
|
||||||
|
int idsToSend = min(idsRemaining, idsInRecord);
|
||||||
|
List<MessageId> acked =
|
||||||
|
idsToAck.subList(idsSent, idsSent + idsToSend);
|
||||||
|
recordWriter.writeAck(new Ack(acked));
|
||||||
|
sessionRecord.onAckSent(acked);
|
||||||
LOG.info("Sent ack");
|
LOG.info("Sent ack");
|
||||||
|
idsSent += idsToSend;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Collection<MessageId> loadMessageIdsToAck() throws DbException {
|
private List<MessageId> loadMessageIdsToAck() throws DbException {
|
||||||
long idCapacity = (getRemainingCapacity() - RECORD_HEADER_BYTES)
|
|
||||||
/ MessageId.LENGTH;
|
|
||||||
if (idCapacity <= 0) return emptyList(); // Out of capacity
|
|
||||||
int maxMessageIds = (int) min(idCapacity, MAX_MESSAGE_IDS);
|
|
||||||
Collection<MessageId> ids = db.transactionWithResult(true, txn ->
|
Collection<MessageId> ids = db.transactionWithResult(true, txn ->
|
||||||
db.getMessagesToAck(txn, contactId, maxMessageIds));
|
db.getMessagesToAck(txn, contactId));
|
||||||
if (LOG.isLoggable(INFO)) {
|
if (LOG.isLoggable(INFO)) {
|
||||||
LOG.info(ids.size() + " messages to ack");
|
LOG.info(ids.size() + " messages to ack");
|
||||||
}
|
}
|
||||||
return ids;
|
return new ArrayList<>(ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getRemainingCapacity() {
|
private long getRemainingCapacity() {
|
||||||
|
|||||||
@@ -389,7 +389,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
db.transaction(true, transaction ->
|
db.transaction(true, transaction ->
|
||||||
db.getMessagesToAck(transaction, contactId, 123));
|
db.getMessagesToAck(transaction, contactId));
|
||||||
fail();
|
fail();
|
||||||
} catch (NoSuchContactException expected) {
|
} catch (NoSuchContactException expected) {
|
||||||
// Expected
|
// Expected
|
||||||
|
|||||||
@@ -14,11 +14,13 @@ import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
|||||||
import org.briarproject.bramble.api.sync.Versions;
|
import org.briarproject.bramble.api.sync.Versions;
|
||||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||||
|
import org.briarproject.bramble.test.CaptureArgumentAction;
|
||||||
import org.briarproject.bramble.test.DbExpectations;
|
import org.briarproject.bramble.test.DbExpectations;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static java.util.Collections.emptyList;
|
import static java.util.Collections.emptyList;
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
@@ -68,13 +70,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
oneOf(eventBus).addListener(session);
|
oneOf(eventBus).addListener(session);
|
||||||
// Send the protocol versions
|
// Send the protocol versions
|
||||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||||
// Calculate capacity for acks
|
|
||||||
oneOf(recordWriter).getBytesWritten();
|
|
||||||
will(returnValue((long) versionRecordBytes));
|
|
||||||
// No messages to ack
|
// No messages to ack
|
||||||
oneOf(db).transactionWithResult(with(true),
|
oneOf(db).transactionWithResult(with(true),
|
||||||
withDbCallable(noAckIdTxn));
|
withDbCallable(noAckIdTxn));
|
||||||
oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS);
|
oneOf(db).getMessagesToAck(noAckIdTxn, contactId);
|
||||||
will(returnValue(emptyList()));
|
will(returnValue(emptyList()));
|
||||||
// Calculate capacity for messages
|
// Calculate capacity for messages
|
||||||
oneOf(recordWriter).getBytesWritten();
|
oneOf(recordWriter).getBytesWritten();
|
||||||
@@ -106,7 +105,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
MAX_FILE_PAYLOAD_BYTES);
|
MAX_FILE_PAYLOAD_BYTES);
|
||||||
|
|
||||||
Transaction ackIdTxn = new Transaction(null, true);
|
Transaction ackIdTxn = new Transaction(null, true);
|
||||||
Transaction noAckIdTxn = new Transaction(null, true);
|
|
||||||
Transaction msgIdTxn = new Transaction(null, true);
|
Transaction msgIdTxn = new Transaction(null, true);
|
||||||
Transaction msgTxn = new Transaction(null, true);
|
Transaction msgTxn = new Transaction(null, true);
|
||||||
|
|
||||||
@@ -114,28 +112,24 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
long capacityForMessages =
|
long capacityForMessages =
|
||||||
MAX_FILE_PAYLOAD_BYTES - versionRecordBytes - ackRecordBytes;
|
MAX_FILE_PAYLOAD_BYTES - versionRecordBytes - ackRecordBytes;
|
||||||
|
|
||||||
|
AtomicReference<Ack> ack = new AtomicReference<>();
|
||||||
|
|
||||||
context.checking(new DbExpectations() {{
|
context.checking(new DbExpectations() {{
|
||||||
// Add listener
|
// Add listener
|
||||||
oneOf(eventBus).addListener(session);
|
oneOf(eventBus).addListener(session);
|
||||||
// Send the protocol versions
|
// Send the protocol versions
|
||||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||||
|
// Load the IDs to ack
|
||||||
|
oneOf(db).transactionWithResult(with(true),
|
||||||
|
withDbCallable(ackIdTxn));
|
||||||
|
oneOf(db).getMessagesToAck(ackIdTxn, contactId);
|
||||||
|
will(returnValue(singletonList(message.getId())));
|
||||||
// Calculate capacity for acks
|
// Calculate capacity for acks
|
||||||
oneOf(recordWriter).getBytesWritten();
|
oneOf(recordWriter).getBytesWritten();
|
||||||
will(returnValue((long) versionRecordBytes));
|
will(returnValue((long) versionRecordBytes));
|
||||||
// One message to ack
|
|
||||||
oneOf(db).transactionWithResult(with(true),
|
|
||||||
withDbCallable(ackIdTxn));
|
|
||||||
oneOf(db).getMessagesToAck(ackIdTxn, contactId, MAX_MESSAGE_IDS);
|
|
||||||
will(returnValue(singletonList(message.getId())));
|
|
||||||
// Send the ack
|
// Send the ack
|
||||||
oneOf(recordWriter).getBytesWritten();
|
|
||||||
will(returnValue((long) versionRecordBytes));
|
|
||||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||||
// No more messages to ack
|
will(new CaptureArgumentAction<>(ack, Ack.class, 0));
|
||||||
oneOf(db).transactionWithResult(with(true),
|
|
||||||
withDbCallable(noAckIdTxn));
|
|
||||||
oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS);
|
|
||||||
will(returnValue(emptyList()));
|
|
||||||
// Calculate capacity for messages
|
// Calculate capacity for messages
|
||||||
oneOf(recordWriter).getBytesWritten();
|
oneOf(recordWriter).getBytesWritten();
|
||||||
will(returnValue((long) versionRecordBytes + ackRecordBytes));
|
will(returnValue((long) versionRecordBytes + ackRecordBytes));
|
||||||
@@ -162,6 +156,7 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
|
|
||||||
assertEquals(singletonList(message.getId()),
|
assertEquals(singletonList(message.getId()),
|
||||||
sessionRecord.getAckedIds());
|
sessionRecord.getAckedIds());
|
||||||
|
assertEquals(singletonList(message.getId()), ack.get().getMessageIds());
|
||||||
assertEquals(singletonList(message1.getId()),
|
assertEquals(singletonList(message1.getId()),
|
||||||
sessionRecord.getSentIds());
|
sessionRecord.getSentIds());
|
||||||
}
|
}
|
||||||
@@ -178,48 +173,50 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
eventBus, contactId, transportId, MAX_LATENCY,
|
eventBus, contactId, transportId, MAX_LATENCY,
|
||||||
streamWriter, recordWriter, sessionRecord, capacity);
|
streamWriter, recordWriter, sessionRecord, capacity);
|
||||||
|
|
||||||
Transaction ackIdTxn1 = new Transaction(null, true);
|
Transaction ackIdTxn = new Transaction(null, true);
|
||||||
Transaction ackIdTxn2 = new Transaction(null, true);
|
|
||||||
|
|
||||||
int firstAckRecordBytes =
|
int firstAckRecordBytes =
|
||||||
RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS;
|
RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS;
|
||||||
int secondAckRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH;
|
int secondAckRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH;
|
||||||
|
|
||||||
List<MessageId> idsInFirstAck = new ArrayList<>(MAX_MESSAGE_IDS);
|
// There are MAX_MESSAGE_IDS + 2 messages that need to be acked, but
|
||||||
for (int i = 0; i < MAX_MESSAGE_IDS; i++) {
|
// only enough capacity to ack MAX_MESSAGE_IDS + 1 messages
|
||||||
idsInFirstAck.add(new MessageId(getRandomId()));
|
List<MessageId> idsToAck = new ArrayList<>(MAX_MESSAGE_IDS + 2);
|
||||||
|
for (int i = 0; i < MAX_MESSAGE_IDS + 2; i++) {
|
||||||
|
idsToAck.add(new MessageId(getRandomId()));
|
||||||
}
|
}
|
||||||
|
// The first ack contains MAX_MESSAGE_IDS IDs
|
||||||
|
List<MessageId> idsInFirstAck = idsToAck.subList(0, MAX_MESSAGE_IDS);
|
||||||
|
// The second ack contains one ID
|
||||||
List<MessageId> idsInSecondAck =
|
List<MessageId> idsInSecondAck =
|
||||||
singletonList(new MessageId(getRandomId()));
|
idsToAck.subList(MAX_MESSAGE_IDS, MAX_MESSAGE_IDS + 1);
|
||||||
List<MessageId> allIds = new ArrayList<>(MAX_MESSAGE_IDS + 1);
|
List<MessageId> idsAcked = idsToAck.subList(0, MAX_MESSAGE_IDS + 1);
|
||||||
allIds.addAll(idsInFirstAck);
|
|
||||||
allIds.addAll(idsInSecondAck);
|
AtomicReference<Ack> firstAck = new AtomicReference<>();
|
||||||
|
AtomicReference<Ack> secondAck = new AtomicReference<>();
|
||||||
|
|
||||||
context.checking(new DbExpectations() {{
|
context.checking(new DbExpectations() {{
|
||||||
// Add listener
|
// Add listener
|
||||||
oneOf(eventBus).addListener(session);
|
oneOf(eventBus).addListener(session);
|
||||||
// Send the protocol versions
|
// Send the protocol versions
|
||||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||||
|
// Load the IDs to ack
|
||||||
|
oneOf(db).transactionWithResult(with(true),
|
||||||
|
withDbCallable(ackIdTxn));
|
||||||
|
oneOf(db).getMessagesToAck(ackIdTxn, contactId);
|
||||||
|
will(returnValue(idsToAck));
|
||||||
// Calculate capacity for acks
|
// Calculate capacity for acks
|
||||||
oneOf(recordWriter).getBytesWritten();
|
oneOf(recordWriter).getBytesWritten();
|
||||||
will(returnValue((long) versionRecordBytes));
|
will(returnValue((long) versionRecordBytes));
|
||||||
// Load the IDs for the first ack record
|
|
||||||
oneOf(db).transactionWithResult(with(true),
|
|
||||||
withDbCallable(ackIdTxn1));
|
|
||||||
oneOf(db).getMessagesToAck(ackIdTxn1, contactId, MAX_MESSAGE_IDS);
|
|
||||||
will(returnValue(idsInFirstAck));
|
|
||||||
// Send the first ack record
|
// Send the first ack record
|
||||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||||
|
will(new CaptureArgumentAction<>(firstAck, Ack.class, 0));
|
||||||
// Calculate remaining capacity for acks
|
// Calculate remaining capacity for acks
|
||||||
oneOf(recordWriter).getBytesWritten();
|
oneOf(recordWriter).getBytesWritten();
|
||||||
will(returnValue((long) versionRecordBytes + firstAckRecordBytes));
|
will(returnValue((long) versionRecordBytes + firstAckRecordBytes));
|
||||||
// Load the IDs for the second ack record
|
|
||||||
oneOf(db).transactionWithResult(with(true),
|
|
||||||
withDbCallable(ackIdTxn2));
|
|
||||||
oneOf(db).getMessagesToAck(ackIdTxn2, contactId, 1);
|
|
||||||
will(returnValue(idsInSecondAck));
|
|
||||||
// Send the second ack record
|
// Send the second ack record
|
||||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||||
|
will(new CaptureArgumentAction<>(secondAck, Ack.class, 0));
|
||||||
// Not enough capacity left for another ack
|
// Not enough capacity left for another ack
|
||||||
oneOf(recordWriter).getBytesWritten();
|
oneOf(recordWriter).getBytesWritten();
|
||||||
will(returnValue((long) versionRecordBytes + firstAckRecordBytes
|
will(returnValue((long) versionRecordBytes + firstAckRecordBytes
|
||||||
@@ -236,7 +233,9 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
|
|
||||||
session.run();
|
session.run();
|
||||||
|
|
||||||
assertEquals(allIds, sessionRecord.getAckedIds());
|
assertEquals(idsAcked, sessionRecord.getAckedIds());
|
||||||
|
assertEquals(idsInFirstAck, firstAck.get().getMessageIds());
|
||||||
|
assertEquals(idsInSecondAck, secondAck.get().getMessageIds());
|
||||||
assertEquals(emptyList(), sessionRecord.getSentIds());
|
assertEquals(emptyList(), sessionRecord.getSentIds());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user