mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-16 12:49:55 +01:00
Defer marking messages and acks as sent.
This commit is contained in:
@@ -72,6 +72,7 @@ import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.concurrent.TimeUnit.HOURS;
|
||||
import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED;
|
||||
import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
|
||||
@@ -94,11 +95,15 @@ import static org.briarproject.bramble.test.TestUtils.getTransportId;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
|
||||
private static final int BATCH_CAPACITY =
|
||||
(RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private final Database<Object> database = context.mock(Database.class);
|
||||
private final ShutdownManager shutdownManager =
|
||||
@@ -298,11 +303,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
throws Exception {
|
||||
context.checking(new Expectations() {{
|
||||
// Check whether the contact is in the DB (which it's not)
|
||||
exactly(19).of(database).startTransaction();
|
||||
exactly(25).of(database).startTransaction();
|
||||
will(returnValue(txn));
|
||||
exactly(19).of(database).containsContact(txn, contactId);
|
||||
exactly(25).of(database).containsContact(txn, contactId);
|
||||
will(returnValue(false));
|
||||
exactly(19).of(database).abortTransaction(txn);
|
||||
exactly(25).of(database).abortTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
eventExecutor, shutdownManager);
|
||||
@@ -356,6 +361,39 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.getMessageToSend(transaction, contactId, messageId, 123,
|
||||
true));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(true, transaction ->
|
||||
db.getMessagesToAck(transaction, contactId, 123));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(true, transaction ->
|
||||
db.getMessagesToSend(transaction, contactId, 123, 456));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(true, transaction ->
|
||||
db.getUnackedMessagesToSend(transaction, contactId));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(true, transaction ->
|
||||
db.getUnackedMessageBytesToSend(transaction, contactId));
|
||||
@@ -439,6 +477,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.setAckSent(transaction, contactId,
|
||||
singletonList(messageId)));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.setContactAlias(transaction, contactId, alias));
|
||||
@@ -456,6 +503,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.setMessagesSent(transaction, contactId,
|
||||
singletonList(messageId), 123));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.setSyncVersions(transaction, contactId, emptyList()));
|
||||
@@ -918,12 +974,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getMessagesToSend(txn, contactId,
|
||||
MAX_MESSAGE_LENGTH * 2, maxLatency);
|
||||
BATCH_CAPACITY, maxLatency);
|
||||
will(returnValue(ids));
|
||||
// First message
|
||||
oneOf(database).getMessage(txn, messageId);
|
||||
will(returnValue(message));
|
||||
oneOf(database).updateRetransmissionData(txn, contactId, messageId,
|
||||
maxLatency);
|
||||
// Second message
|
||||
oneOf(database).getMessage(txn, messageId1);
|
||||
will(returnValue(message1));
|
||||
oneOf(database).updateRetransmissionData(txn, contactId, messageId1,
|
||||
@@ -937,7 +995,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
assertEquals(messages, db.generateBatch(transaction, contactId,
|
||||
MAX_MESSAGE_LENGTH * 2, maxLatency)));
|
||||
BATCH_CAPACITY, maxLatency)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -1001,12 +1059,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getRequestedMessagesToSend(txn, contactId,
|
||||
MAX_MESSAGE_LENGTH * 2, maxLatency);
|
||||
BATCH_CAPACITY, maxLatency);
|
||||
will(returnValue(ids));
|
||||
// First message
|
||||
oneOf(database).getMessage(txn, messageId);
|
||||
will(returnValue(message));
|
||||
oneOf(database).updateRetransmissionData(txn, contactId,
|
||||
messageId, maxLatency);
|
||||
// Second message
|
||||
oneOf(database).getMessage(txn, messageId1);
|
||||
will(returnValue(message1));
|
||||
oneOf(database).updateRetransmissionData(txn, contactId,
|
||||
@@ -1020,7 +1080,73 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
assertEquals(messages, db.generateRequestedBatch(transaction,
|
||||
contactId, MAX_MESSAGE_LENGTH * 2, maxLatency)));
|
||||
contactId, BATCH_CAPACITY, maxLatency)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMessageToSendMessageNotVisible() throws Exception {
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(database).startTransaction();
|
||||
will(returnValue(txn));
|
||||
oneOf(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).containsVisibleMessage(txn, contactId, messageId);
|
||||
will(returnValue(false));
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
assertNull(db.getMessageToSend(transaction, contactId,
|
||||
messageId, maxLatency, false)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMessageToSendMessageNotMarkedAsSent() throws Exception {
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(database).startTransaction();
|
||||
will(returnValue(txn));
|
||||
oneOf(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).containsVisibleMessage(txn, contactId, messageId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getMessage(txn, messageId);
|
||||
will(returnValue(message));
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
assertEquals(message, db.getMessageToSend(transaction,
|
||||
contactId, messageId, maxLatency, false)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMessageToSendMessageMarkedAsSent() throws Exception {
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(database).startTransaction();
|
||||
will(returnValue(txn));
|
||||
oneOf(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).containsVisibleMessage(txn, contactId, messageId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getMessage(txn, messageId);
|
||||
will(returnValue(message));
|
||||
oneOf(database).updateRetransmissionData(txn, contactId, messageId,
|
||||
maxLatency);
|
||||
oneOf(database).lowerRequestedFlag(txn, contactId,
|
||||
singletonList(messageId));
|
||||
oneOf(database).commitTransaction(txn);
|
||||
oneOf(eventBus).broadcast(with(any(MessagesSentEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
assertEquals(message, db.getMessageToSend(transaction,
|
||||
contactId, messageId, maxLatency, true)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -1245,6 +1371,62 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
db.receiveRequest(transaction, contactId, r));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetAckSent() throws Exception {
|
||||
Collection<MessageId> acked = asList(messageId, messageId1);
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(database).startTransaction();
|
||||
will(returnValue(txn));
|
||||
oneOf(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
// First message is still visible to the contact - flag lowered
|
||||
oneOf(database).containsVisibleMessage(txn, contactId, messageId);
|
||||
will(returnValue(true));
|
||||
// Second message is no longer visible - flag not lowered
|
||||
oneOf(database).containsVisibleMessage(txn, contactId, messageId1);
|
||||
will(returnValue(false));
|
||||
oneOf(database)
|
||||
.lowerAckFlag(txn, contactId, singletonList(messageId));
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
db.setAckSent(transaction, contactId, acked));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetMessagesSent() throws Exception {
|
||||
long maxLatency = 123456;
|
||||
Collection<MessageId> sent = asList(messageId, messageId1);
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(database).startTransaction();
|
||||
will(returnValue(txn));
|
||||
oneOf(database).containsContact(txn, contactId);
|
||||
will(returnValue(true));
|
||||
// First message is still visible to the contact - mark as sent
|
||||
oneOf(database).containsVisibleMessage(txn, contactId, messageId);
|
||||
will(returnValue(true));
|
||||
oneOf(database).getMessageLength(txn, messageId);
|
||||
will(returnValue(message.getRawLength()));
|
||||
oneOf(database).updateRetransmissionData(txn, contactId, messageId,
|
||||
maxLatency);
|
||||
// Second message is no longer visible - don't mark as sent
|
||||
oneOf(database).containsVisibleMessage(txn, contactId, messageId1);
|
||||
will(returnValue(false));
|
||||
oneOf(database).lowerRequestedFlag(txn, contactId,
|
||||
singletonList(messageId));
|
||||
oneOf(database).commitTransaction(txn);
|
||||
oneOf(eventBus).broadcast(with(any(MessagesSentEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
db.setMessagesSent(transaction, contactId, sent, maxLatency));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChangingVisibilityFromInvisibleToVisibleCallsListeners()
|
||||
throws Exception {
|
||||
|
||||
@@ -33,7 +33,9 @@ import java.util.Random;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static java.util.logging.Level.OFF;
|
||||
import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED;
|
||||
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
|
||||
import static org.briarproject.bramble.test.TestUtils.getAuthor;
|
||||
@@ -97,6 +99,9 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
|
||||
// All our transports use a maximum latency of 30 seconds
|
||||
private static final int MAX_LATENCY = 30 * 1000;
|
||||
|
||||
private static final int BATCH_CAPACITY =
|
||||
(RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2;
|
||||
|
||||
protected final File testDir = getTestDirectory();
|
||||
private final File resultsFile = new File(getTestName() + ".tsv");
|
||||
protected final Random random = new Random();
|
||||
@@ -471,7 +476,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
|
||||
benchmark(name, db -> {
|
||||
Connection txn = db.startTransaction();
|
||||
db.getMessagesToSend(txn, pickRandom(contacts).getId(),
|
||||
MAX_MESSAGE_IDS, MAX_LATENCY);
|
||||
BATCH_CAPACITY, MAX_LATENCY);
|
||||
db.commitTransaction(txn);
|
||||
});
|
||||
}
|
||||
@@ -522,7 +527,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
|
||||
benchmark(name, db -> {
|
||||
Connection txn = db.startTransaction();
|
||||
db.getRequestedMessagesToSend(txn, pickRandom(contacts).getId(),
|
||||
MAX_MESSAGE_IDS, MAX_LATENCY);
|
||||
BATCH_CAPACITY, MAX_LATENCY);
|
||||
db.commitTransaction(txn);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singleton;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
@@ -65,6 +64,7 @@ import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADL
|
||||
import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED;
|
||||
import static org.briarproject.bramble.api.db.Metadata.REMOVE;
|
||||
import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_AUTHOR_NAME_LENGTH;
|
||||
import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
|
||||
@@ -350,14 +350,14 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
// The message is sendable, but too large to send
|
||||
assertOneMessageToSendLazily(db, txn);
|
||||
assertOneMessageToSendEagerly(db, txn);
|
||||
int capacity = RECORD_HEADER_BYTES + message.getRawLength() - 1;
|
||||
Collection<MessageId> ids =
|
||||
db.getMessagesToSend(txn, contactId, message.getRawLength() - 1,
|
||||
MAX_LATENCY);
|
||||
db.getMessagesToSend(txn, contactId, capacity, MAX_LATENCY);
|
||||
assertTrue(ids.isEmpty());
|
||||
|
||||
// The message is just the right size to send
|
||||
ids = db.getMessagesToSend(txn, contactId, message.getRawLength(),
|
||||
MAX_LATENCY);
|
||||
capacity = RECORD_HEADER_BYTES + message.getRawLength();
|
||||
ids = db.getMessagesToSend(txn, contactId, capacity, MAX_LATENCY);
|
||||
assertEquals(singletonList(messageId), ids);
|
||||
|
||||
db.commitTransaction(txn);
|
||||
@@ -396,16 +396,15 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
Collection<MessageId> ids = db.getMessagesToAck(txn, contactId, 1234);
|
||||
assertEquals(asList(messageId, messageId1), ids);
|
||||
|
||||
// Remove both message IDs
|
||||
// Lower the ack flag
|
||||
db.lowerAckFlag(txn, contactId, asList(messageId, messageId1));
|
||||
|
||||
// Both message IDs should have been removed
|
||||
// No message IDs should be returned
|
||||
assertFalse(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false));
|
||||
assertFalse(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true));
|
||||
assertEquals(emptyList(), db.getMessagesToAck(txn,
|
||||
contactId, 1234));
|
||||
assertEquals(emptyList(), db.getMessagesToAck(txn, contactId, 1234));
|
||||
|
||||
// Raise the ack flag again
|
||||
db.raiseAckFlag(txn, contactId, messageId);
|
||||
@@ -2603,7 +2602,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
Connection txn) throws Exception {
|
||||
assertFalse(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true));
|
||||
Map<MessageId, Integer> unacked =
|
||||
Collection<MessageId> unacked =
|
||||
db.getUnackedMessagesToSend(txn, contactId);
|
||||
assertTrue(unacked.isEmpty());
|
||||
assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
|
||||
@@ -2613,10 +2612,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
Connection txn) throws Exception {
|
||||
assertTrue(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true));
|
||||
Map<MessageId, Integer> unacked =
|
||||
Collection<MessageId> unacked =
|
||||
db.getUnackedMessagesToSend(txn, contactId);
|
||||
assertEquals(singleton(messageId), unacked.keySet());
|
||||
assertEquals(message.getRawLength(), unacked.get(messageId).intValue());
|
||||
assertEquals(singletonList(messageId), unacked);
|
||||
assertEquals(message.getRawLength(),
|
||||
db.getUnackedMessageBytesToSend(txn, contactId));
|
||||
}
|
||||
|
||||
@@ -0,0 +1,134 @@
|
||||
package org.briarproject.bramble.sync;
|
||||
|
||||
import org.briarproject.bramble.api.contact.ContactId;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.Transaction;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||
import org.briarproject.bramble.test.DbExpectations;
|
||||
import org.junit.Test;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.test.TestUtils.getContactId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getMessage;
|
||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getTransportId;
|
||||
|
||||
public class EagerSimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
|
||||
private static final int MAX_LATENCY = Integer.MAX_VALUE;
|
||||
|
||||
private final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
private final EventBus eventBus = context.mock(EventBus.class);
|
||||
private final StreamWriter streamWriter = context.mock(StreamWriter.class);
|
||||
private final SyncRecordWriter recordWriter =
|
||||
context.mock(SyncRecordWriter.class);
|
||||
|
||||
private final ContactId contactId = getContactId();
|
||||
private final TransportId transportId = getTransportId();
|
||||
private final Ack ack =
|
||||
new Ack(singletonList(new MessageId(getRandomId())));
|
||||
private final Message message = getMessage(new GroupId(getRandomId()),
|
||||
MAX_MESSAGE_BODY_LENGTH);
|
||||
private final Message message1 = getMessage(new GroupId(getRandomId()),
|
||||
MAX_MESSAGE_BODY_LENGTH);
|
||||
|
||||
@Test
|
||||
public void testNothingToSendEagerly() throws Exception {
|
||||
EagerSimplexOutgoingSession session =
|
||||
new EagerSimplexOutgoingSession(db, eventBus, contactId,
|
||||
transportId, MAX_LATENCY, streamWriter, recordWriter);
|
||||
|
||||
Transaction noAckTxn = new Transaction(null, false);
|
||||
Transaction noIdsTxn = new Transaction(null, true);
|
||||
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
oneOf(eventBus).addListener(session);
|
||||
// Send the protocol versions
|
||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||
// No acks to send
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(noAckTxn));
|
||||
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(null));
|
||||
// No messages to send
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(noIdsTxn));
|
||||
oneOf(db).getUnackedMessagesToSend(noIdsTxn, contactId);
|
||||
will(returnValue(emptyList()));
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
// Remove listener
|
||||
oneOf(eventBus).removeListener(session);
|
||||
}});
|
||||
|
||||
session.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSomethingToSendEagerly() throws Exception {
|
||||
EagerSimplexOutgoingSession session =
|
||||
new EagerSimplexOutgoingSession(db, eventBus, contactId,
|
||||
transportId, MAX_LATENCY, streamWriter, recordWriter);
|
||||
|
||||
Transaction ackTxn = new Transaction(null, false);
|
||||
Transaction noAckTxn = new Transaction(null, false);
|
||||
Transaction idsTxn = new Transaction(null, true);
|
||||
Transaction msgTxn = new Transaction(null, false);
|
||||
Transaction msgTxn1 = new Transaction(null, false);
|
||||
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
oneOf(eventBus).addListener(session);
|
||||
// Send the protocol versions
|
||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||
// One ack to send
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(ackTxn));
|
||||
oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(ack));
|
||||
oneOf(recordWriter).writeAck(ack);
|
||||
// No more acks
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(noAckTxn));
|
||||
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(null));
|
||||
// Two messages to send
|
||||
oneOf(db).transactionWithResult(with(true), withDbCallable(idsTxn));
|
||||
oneOf(db).getUnackedMessagesToSend(idsTxn, contactId);
|
||||
will(returnValue(asList(message.getId(), message1.getId())));
|
||||
// Try to send the first message - it's no longer shared
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(msgTxn));
|
||||
oneOf(db).getMessageToSend(msgTxn, contactId, message.getId(),
|
||||
MAX_LATENCY, true);
|
||||
will(returnValue(null));
|
||||
// Send the second message
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(msgTxn1));
|
||||
oneOf(db).getMessageToSend(msgTxn1, contactId, message1.getId(),
|
||||
MAX_LATENCY, true);
|
||||
will(returnValue(message1));
|
||||
oneOf(recordWriter).writeMessage(message1);
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
// Remove listener
|
||||
oneOf(eventBus).removeListener(session);
|
||||
}});
|
||||
|
||||
session.run();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,231 @@
|
||||
package org.briarproject.bramble.sync;
|
||||
|
||||
import org.briarproject.bramble.api.contact.ContactId;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.Transaction;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.DeferredSendHandler;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||
import org.briarproject.bramble.test.DbExpectations;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_FILE_PAYLOAD_BYTES;
|
||||
import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.test.TestUtils.getContactId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getMessage;
|
||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getTransportId;
|
||||
|
||||
public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
|
||||
private static final int MAX_LATENCY = Integer.MAX_VALUE;
|
||||
|
||||
private final DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
private final EventBus eventBus = context.mock(EventBus.class);
|
||||
private final StreamWriter streamWriter = context.mock(StreamWriter.class);
|
||||
private final SyncRecordWriter recordWriter =
|
||||
context.mock(SyncRecordWriter.class);
|
||||
private final DeferredSendHandler deferredSendHandler =
|
||||
context.mock(DeferredSendHandler.class);
|
||||
|
||||
private final ContactId contactId = getContactId();
|
||||
private final TransportId transportId = getTransportId();
|
||||
private final Message message = getMessage(new GroupId(getRandomId()),
|
||||
MAX_MESSAGE_BODY_LENGTH);
|
||||
private final Message message1 = getMessage(new GroupId(getRandomId()),
|
||||
MAX_MESSAGE_BODY_LENGTH);
|
||||
private final int versionRecordBytes = RECORD_HEADER_BYTES + 1;
|
||||
|
||||
@Test
|
||||
public void testNothingToSend() throws Exception {
|
||||
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
streamWriter, recordWriter, deferredSendHandler,
|
||||
MAX_FILE_PAYLOAD_BYTES);
|
||||
|
||||
Transaction noAckIdTxn = new Transaction(null, true);
|
||||
Transaction noMsgIdTxn = new Transaction(null, true);
|
||||
|
||||
int capacityForMessages = MAX_FILE_PAYLOAD_BYTES - versionRecordBytes;
|
||||
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
oneOf(eventBus).addListener(session);
|
||||
// Send the protocol versions
|
||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||
// Calculate capacity for acks
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes));
|
||||
// No messages to ack
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(noAckIdTxn));
|
||||
oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(emptyList()));
|
||||
// Calculate capacity for messages
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes));
|
||||
// No messages to send
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(noMsgIdTxn));
|
||||
oneOf(db).getMessagesToSend(noMsgIdTxn, contactId,
|
||||
capacityForMessages, MAX_LATENCY);
|
||||
will(returnValue(emptyList()));
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
// Remove listener
|
||||
oneOf(eventBus).removeListener(session);
|
||||
}});
|
||||
|
||||
session.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSomethingToSend() throws Exception {
|
||||
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
streamWriter, recordWriter, deferredSendHandler,
|
||||
MAX_FILE_PAYLOAD_BYTES);
|
||||
|
||||
Transaction ackIdTxn = new Transaction(null, true);
|
||||
Transaction noAckIdTxn = new Transaction(null, true);
|
||||
Transaction msgIdTxn = new Transaction(null, true);
|
||||
Transaction msgTxn = new Transaction(null, true);
|
||||
|
||||
int ackRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH;
|
||||
int capacityForMessages =
|
||||
MAX_FILE_PAYLOAD_BYTES - versionRecordBytes - ackRecordBytes;
|
||||
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
oneOf(eventBus).addListener(session);
|
||||
// Send the protocol versions
|
||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||
// Calculate capacity for acks
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes));
|
||||
// One message to ack
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(ackIdTxn));
|
||||
oneOf(db).getMessagesToAck(ackIdTxn, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(singletonList(message.getId())));
|
||||
// Send the ack
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes));
|
||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||
oneOf(deferredSendHandler)
|
||||
.onAckSent(singletonList(message.getId()));
|
||||
// No more messages to ack
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(noAckIdTxn));
|
||||
oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(emptyList()));
|
||||
// Calculate capacity for messages
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes + ackRecordBytes));
|
||||
// One message to send
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(msgIdTxn));
|
||||
oneOf(db).getMessagesToSend(msgIdTxn, contactId,
|
||||
capacityForMessages, MAX_LATENCY);
|
||||
will(returnValue(singletonList(message1.getId())));
|
||||
// Send the message
|
||||
oneOf(db).transactionWithNullableResult(with(true),
|
||||
withNullableDbCallable(msgTxn));
|
||||
oneOf(db).getMessageToSend(msgTxn, contactId, message1.getId(),
|
||||
MAX_LATENCY, false);
|
||||
will(returnValue(message1));
|
||||
oneOf(recordWriter).writeMessage(message1);
|
||||
oneOf(deferredSendHandler).onMessageSent(message1.getId());
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
// Remove listener
|
||||
oneOf(eventBus).removeListener(session);
|
||||
}});
|
||||
|
||||
session.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllCapacityUsedByAcks() throws Exception {
|
||||
// The file has enough capacity for a max-size ack record, another
|
||||
// ack record with one message ID, and a few bytes left over
|
||||
int capacity = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS
|
||||
+ RECORD_HEADER_BYTES + MessageId.LENGTH + MessageId.LENGTH - 1;
|
||||
|
||||
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
streamWriter, recordWriter, deferredSendHandler, capacity);
|
||||
|
||||
Transaction ackIdTxn1 = new Transaction(null, true);
|
||||
Transaction ackIdTxn2 = new Transaction(null, true);
|
||||
|
||||
int firstAckRecordBytes =
|
||||
RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS;
|
||||
int secondAckRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH;
|
||||
|
||||
List<MessageId> idsInFirstAck = new ArrayList<>(MAX_MESSAGE_IDS);
|
||||
for (int i = 0; i < MAX_MESSAGE_IDS; i++) {
|
||||
idsInFirstAck.add(new MessageId(getRandomId()));
|
||||
}
|
||||
List<MessageId> idsInSecondAck =
|
||||
singletonList(new MessageId(getRandomId()));
|
||||
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
oneOf(eventBus).addListener(session);
|
||||
// Send the protocol versions
|
||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||
// Calculate capacity for acks
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes));
|
||||
// Load the IDs for the first ack record
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(ackIdTxn1));
|
||||
oneOf(db).getMessagesToAck(ackIdTxn1, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(idsInFirstAck));
|
||||
// Send the first ack record
|
||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||
oneOf(deferredSendHandler).onAckSent(idsInFirstAck);
|
||||
// Calculate remaining capacity for acks
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes + firstAckRecordBytes));
|
||||
// Load the IDs for the second ack record
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(ackIdTxn2));
|
||||
oneOf(db).getMessagesToAck(ackIdTxn2, contactId, 1);
|
||||
will(returnValue(idsInSecondAck));
|
||||
// Send the second ack record
|
||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||
oneOf(deferredSendHandler).onAckSent(idsInSecondAck);
|
||||
// Not enough capacity left for another ack
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes + firstAckRecordBytes
|
||||
+ secondAckRecordBytes));
|
||||
// Not enough capacity left for any messages
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes + firstAckRecordBytes
|
||||
+ secondAckRecordBytes));
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
// Remove listener
|
||||
oneOf(eventBus).removeListener(session);
|
||||
}});
|
||||
|
||||
session.run();
|
||||
}
|
||||
}
|
||||
@@ -16,14 +16,10 @@ import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||
import org.briarproject.bramble.test.DbExpectations;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.sync.SimplexOutgoingSession.BATCH_CAPACITY;
|
||||
import static org.briarproject.bramble.test.TestUtils.getContactId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getMessage;
|
||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||
@@ -45,14 +41,12 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
new Ack(singletonList(new MessageId(getRandomId())));
|
||||
private final Message message = getMessage(new GroupId(getRandomId()),
|
||||
MAX_MESSAGE_BODY_LENGTH);
|
||||
private final Message message1 = getMessage(new GroupId(getRandomId()),
|
||||
MAX_MESSAGE_BODY_LENGTH);
|
||||
|
||||
@Test
|
||||
public void testNothingToSend() throws Exception {
|
||||
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
false, streamWriter, recordWriter);
|
||||
streamWriter, recordWriter);
|
||||
|
||||
Transaction noAckTxn = new Transaction(null, false);
|
||||
Transaction noMsgTxn = new Transaction(null, false);
|
||||
@@ -71,7 +65,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(noMsgTxn));
|
||||
oneOf(db).generateBatch(noMsgTxn, contactId,
|
||||
MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
|
||||
BATCH_CAPACITY, MAX_LATENCY);
|
||||
will(returnValue(null));
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
@@ -82,44 +76,11 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
session.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNothingToSendEagerly() throws Exception {
|
||||
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
true, streamWriter, recordWriter);
|
||||
|
||||
Transaction noAckTxn = new Transaction(null, false);
|
||||
Transaction noIdsTxn = new Transaction(null, true);
|
||||
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
oneOf(eventBus).addListener(session);
|
||||
// Send the protocol versions
|
||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||
// No acks to send
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(noAckTxn));
|
||||
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(null));
|
||||
// No messages to send
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(noIdsTxn));
|
||||
oneOf(db).getUnackedMessagesToSend(noIdsTxn, contactId);
|
||||
will(returnValue(emptyMap()));
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
// Remove listener
|
||||
oneOf(eventBus).removeListener(session);
|
||||
}});
|
||||
|
||||
session.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSomethingToSend() throws Exception {
|
||||
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
false, streamWriter, recordWriter);
|
||||
streamWriter, recordWriter);
|
||||
|
||||
Transaction ackTxn = new Transaction(null, false);
|
||||
Transaction noAckTxn = new Transaction(null, false);
|
||||
@@ -146,14 +107,14 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(msgTxn));
|
||||
oneOf(db).generateBatch(msgTxn, contactId,
|
||||
MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
|
||||
BATCH_CAPACITY, MAX_LATENCY);
|
||||
will(returnValue(singletonList(message)));
|
||||
oneOf(recordWriter).writeMessage(message);
|
||||
// No more messages
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(noMsgTxn));
|
||||
oneOf(db).generateBatch(noMsgTxn, contactId,
|
||||
MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
|
||||
BATCH_CAPACITY, MAX_LATENCY);
|
||||
will(returnValue(null));
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
@@ -163,63 +124,4 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
|
||||
session.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSomethingToSendEagerly() throws Exception {
|
||||
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
true, streamWriter, recordWriter);
|
||||
|
||||
Map<MessageId, Integer> unacked = new LinkedHashMap<>();
|
||||
unacked.put(message.getId(), message.getRawLength());
|
||||
unacked.put(message1.getId(), message1.getRawLength());
|
||||
|
||||
Transaction ackTxn = new Transaction(null, false);
|
||||
Transaction noAckTxn = new Transaction(null, false);
|
||||
Transaction idsTxn = new Transaction(null, true);
|
||||
Transaction msgTxn = new Transaction(null, false);
|
||||
Transaction msgTxn1 = new Transaction(null, false);
|
||||
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
oneOf(eventBus).addListener(session);
|
||||
// Send the protocol versions
|
||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||
// One ack to send
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(ackTxn));
|
||||
oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(ack));
|
||||
oneOf(recordWriter).writeAck(ack);
|
||||
// No more acks
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(noAckTxn));
|
||||
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
|
||||
will(returnValue(null));
|
||||
// Two messages to send
|
||||
oneOf(db).transactionWithResult(with(true), withDbCallable(idsTxn));
|
||||
oneOf(db).getUnackedMessagesToSend(idsTxn, contactId);
|
||||
will(returnValue(unacked));
|
||||
// Send the first message
|
||||
oneOf(db).transactionWithResult(with(false),
|
||||
withDbCallable(msgTxn));
|
||||
oneOf(db).generateBatch(msgTxn, contactId,
|
||||
singletonList(message.getId()), MAX_LATENCY);
|
||||
will(returnValue(singletonList(message)));
|
||||
oneOf(recordWriter).writeMessage(message);
|
||||
// Send the second message
|
||||
oneOf(db).transactionWithResult(with(false),
|
||||
withDbCallable(msgTxn1));
|
||||
oneOf(db).generateBatch(msgTxn1, contactId,
|
||||
singletonList(message1.getId()), MAX_LATENCY);
|
||||
will(returnValue(singletonList(message1)));
|
||||
oneOf(recordWriter).writeMessage(message1);
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
// Remove listener
|
||||
oneOf(eventBus).removeListener(session);
|
||||
}});
|
||||
|
||||
session.run();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user