From a960bfb2c164de2a9ec897d575c4fa917ac37cc9 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 10 Jun 2021 17:28:30 +0100 Subject: [PATCH] Add tests for eager retransmission. --- .../sync/SimplexOutgoingSessionTest.java | 120 ++++++++++++++++-- 1 file changed, 111 insertions(+), 9 deletions(-) diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index 0dd7baf2f..9bcf7cc3f 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -17,9 +17,14 @@ import org.briarproject.bramble.test.DbExpectations; import org.briarproject.bramble.test.ImmediateExecutor; import org.junit.Test; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.concurrent.Executor; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; +import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getMessage; @@ -39,14 +44,19 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { private final Executor dbExecutor = new ImmediateExecutor(); private final ContactId contactId = getContactId(); private final TransportId transportId = getTransportId(); - private final Message message = getMessage(new GroupId(getRandomId())); - private final MessageId messageId = message.getId(); + private final Ack ack = + new Ack(singletonList(new MessageId(getRandomId()))); + private final Message message = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); + private final Message message1 = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); @Test public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, false, streamWriter, recordWriter); + Transaction noAckTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false); @@ -63,8 +73,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { // No messages to send oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); - oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + oneOf(db).generateBatch(noMsgTxn, contactId, + MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); @@ -75,12 +85,45 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { session.run(); } + @Test + public void testNothingToSendEagerly() throws Exception { + SimplexOutgoingSession session = new SimplexOutgoingSession(db, + dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + true, streamWriter, recordWriter); + + Transaction noAckTxn = new Transaction(null, false); + Transaction noIdsTxn = new Transaction(null, true); + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // No acks to send + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); + // No messages to send + oneOf(db).transactionWithResult(with(true), + withDbCallable(noIdsTxn)); + oneOf(db).getUnackedMessagesToSend(noIdsTxn, contactId); + will(returnValue(emptyMap())); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } + @Test public void testSomethingToSend() throws Exception { - Ack ack = new Ack(singletonList(messageId)); SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, false, streamWriter, recordWriter); + Transaction ackTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false); @@ -100,8 +143,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { // One message to send oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(msgTxn)); - oneOf(db).generateBatch(with(msgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + oneOf(db).generateBatch(msgTxn, contactId, + MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(singletonList(message))); oneOf(recordWriter).writeMessage(message); // No more acks @@ -112,8 +155,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { // No more messages oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); - oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + oneOf(db).generateBatch(noMsgTxn, contactId, + MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); @@ -123,4 +166,63 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { session.run(); } + + @Test + public void testSomethingToSendEagerly() throws Exception { + SimplexOutgoingSession session = new SimplexOutgoingSession(db, + dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + true, streamWriter, recordWriter); + + Map unacked = new LinkedHashMap<>(); + unacked.put(message.getId(), message.getRawLength()); + unacked.put(message1.getId(), message1.getRawLength()); + + Transaction ackTxn = new Transaction(null, false); + Transaction noAckTxn = new Transaction(null, false); + Transaction idsTxn = new Transaction(null, true); + Transaction msgTxn = new Transaction(null, false); + Transaction msgTxn1 = new Transaction(null, false); + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // One ack to send + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(ackTxn)); + oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(ack)); + oneOf(recordWriter).writeAck(ack); + // No more acks + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); + // Two messages to send + oneOf(db).transactionWithResult(with(true), withDbCallable(idsTxn)); + oneOf(db).getUnackedMessagesToSend(idsTxn, contactId); + will(returnValue(unacked)); + // Send the first message + oneOf(db).transactionWithResult(with(false), + withDbCallable(msgTxn)); + oneOf(db).generateBatch(msgTxn, contactId, + singletonList(message.getId()), MAX_LATENCY); + will(returnValue(singletonList(message))); + oneOf(recordWriter).writeMessage(message); + // Send the second message + oneOf(db).transactionWithResult(with(false), + withDbCallable(msgTxn1)); + oneOf(db).generateBatch(msgTxn1, contactId, + singletonList(message1.getId()), MAX_LATENCY); + will(returnValue(singletonList(message1))); + oneOf(recordWriter).writeMessage(message1); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } }