Add tests for eager retransmission.

This commit is contained in:
akwizgran
2021-06-10 17:28:30 +01:00
parent 847650f280
commit a960bfb2c1

View File

@@ -17,9 +17,14 @@ import org.briarproject.bramble.test.DbExpectations;
import org.briarproject.bramble.test.ImmediateExecutor;
import org.junit.Test;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.test.TestUtils.getContactId;
import static org.briarproject.bramble.test.TestUtils.getMessage;
@@ -39,14 +44,19 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
private final Executor dbExecutor = new ImmediateExecutor();
private final ContactId contactId = getContactId();
private final TransportId transportId = getTransportId();
private final Message message = getMessage(new GroupId(getRandomId()));
private final MessageId messageId = message.getId();
private final Ack ack =
new Ack(singletonList(new MessageId(getRandomId())));
private final Message message = getMessage(new GroupId(getRandomId()),
MAX_MESSAGE_BODY_LENGTH);
private final Message message1 = getMessage(new GroupId(getRandomId()),
MAX_MESSAGE_BODY_LENGTH);
@Test
public void testNothingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
false, streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false);
Transaction noMsgTxn = new Transaction(null, false);
@@ -63,8 +73,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
// No messages to send
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY));
oneOf(db).generateBatch(noMsgTxn, contactId,
MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
will(returnValue(null));
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
@@ -75,12 +85,45 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
session.run();
}
@Test
public void testNothingToSendEagerly() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
true, streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false);
Transaction noIdsTxn = new Transaction(null, true);
context.checking(new DbExpectations() {{
// Add listener
oneOf(eventBus).addListener(session);
// Send the protocol versions
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
// No acks to send
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noAckTxn));
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(null));
// No messages to send
oneOf(db).transactionWithResult(with(true),
withDbCallable(noIdsTxn));
oneOf(db).getUnackedMessagesToSend(noIdsTxn, contactId);
will(returnValue(emptyMap()));
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
// Remove listener
oneOf(eventBus).removeListener(session);
}});
session.run();
}
@Test
public void testSomethingToSend() throws Exception {
Ack ack = new Ack(singletonList(messageId));
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
false, streamWriter, recordWriter);
Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false);
Transaction msgTxn = new Transaction(null, false);
@@ -100,8 +143,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
// One message to send
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(msgTxn));
oneOf(db).generateBatch(with(msgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY));
oneOf(db).generateBatch(msgTxn, contactId,
MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
will(returnValue(singletonList(message)));
oneOf(recordWriter).writeMessage(message);
// No more acks
@@ -112,8 +155,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
// No more messages
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY));
oneOf(db).generateBatch(noMsgTxn, contactId,
MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
will(returnValue(null));
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
@@ -123,4 +166,63 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
session.run();
}
@Test
public void testSomethingToSendEagerly() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
true, streamWriter, recordWriter);
Map<MessageId, Integer> unacked = new LinkedHashMap<>();
unacked.put(message.getId(), message.getRawLength());
unacked.put(message1.getId(), message1.getRawLength());
Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false);
Transaction idsTxn = new Transaction(null, true);
Transaction msgTxn = new Transaction(null, false);
Transaction msgTxn1 = new Transaction(null, false);
context.checking(new DbExpectations() {{
// Add listener
oneOf(eventBus).addListener(session);
// Send the protocol versions
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
// One ack to send
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(ackTxn));
oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(ack));
oneOf(recordWriter).writeAck(ack);
// No more acks
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noAckTxn));
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(null));
// Two messages to send
oneOf(db).transactionWithResult(with(true), withDbCallable(idsTxn));
oneOf(db).getUnackedMessagesToSend(idsTxn, contactId);
will(returnValue(unacked));
// Send the first message
oneOf(db).transactionWithResult(with(false),
withDbCallable(msgTxn));
oneOf(db).generateBatch(msgTxn, contactId,
singletonList(message.getId()), MAX_LATENCY);
will(returnValue(singletonList(message)));
oneOf(recordWriter).writeMessage(message);
// Send the second message
oneOf(db).transactionWithResult(with(false),
withDbCallable(msgTxn1));
oneOf(db).generateBatch(msgTxn1, contactId,
singletonList(message1.getId()), MAX_LATENCY);
will(returnValue(singletonList(message1)));
oneOf(recordWriter).writeMessage(message1);
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
// Remove listener
oneOf(eventBus).removeListener(session);
}});
session.run();
}
}