From 10ab60569ba1e1c0b4e1c18627d79c40c765d7b1 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 16 Jun 2022 13:14:49 +0100 Subject: [PATCH] Replace DeferredSendHandler with OutgoingSessionRecord. --- .../bramble/api/sync/DeferredSendHandler.java | 15 -------- .../api/sync/OutgoingSessionRecord.java | 37 +++++++++++++++++++ .../bramble/sync/MailboxOutgoingSession.java | 14 +++---- .../sync/MailboxOutgoingSessionTest.java | 33 +++++++++++------ 4 files changed, 66 insertions(+), 33 deletions(-) delete mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java create mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/sync/OutgoingSessionRecord.java diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java deleted file mode 100644 index 1966b3bb6..000000000 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.briarproject.bramble.api.sync; - -import java.util.Collection; - -/** - * An interface for holding the IDs of messages sent and acked during an - * outgoing {@link SyncSession} so they can be recorded in the DB as sent - * or acked at some later time. - */ -public interface DeferredSendHandler { - - void onAckSent(Collection acked); - - void onMessageSent(MessageId sent); -} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/OutgoingSessionRecord.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/OutgoingSessionRecord.java new file mode 100644 index 000000000..6f1077a23 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/OutgoingSessionRecord.java @@ -0,0 +1,37 @@ +package org.briarproject.bramble.api.sync; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +import java.util.Collection; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * A container for holding the IDs of messages sent and acked during an + * outgoing {@link SyncSession}, so they can be recorded in the DB as sent + * or acked at some later time. + */ +@ThreadSafe +@NotNullByDefault +public class OutgoingSessionRecord { + + private final Collection ackedIds = new CopyOnWriteArrayList<>(); + private final Collection sentIds = new CopyOnWriteArrayList<>(); + + public void onAckSent(Collection acked) { + ackedIds.addAll(acked); + } + + public void onMessageSent(MessageId sent) { + sentIds.add(sent); + } + + public Collection getAckedIds() { + return ackedIds; + } + + public Collection getSentIds() { + return sentIds; + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java index 5d4e06fb4..3dcf95a3a 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java @@ -7,9 +7,9 @@ import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.sync.Ack; -import org.briarproject.bramble.api.sync.DeferredSendHandler; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.transport.StreamWriter; @@ -29,7 +29,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LEN /** * A {@link SimplexOutgoingSession} for sending and acking messages via a - * mailbox. The session uses a {@link DeferredSendHandler} to record the IDs + * mailbox. The session uses a {@link OutgoingSessionRecord} to record the IDs * of the messages sent and acked during the session so that they can be * recorded in the DB as sent or acked after the file has been successfully * uploaded to the mailbox. @@ -41,7 +41,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { private static final Logger LOG = getLogger(MailboxOutgoingSession.class.getName()); - private final DeferredSendHandler deferredSendHandler; + private final OutgoingSessionRecord sessionRecord; private final long initialCapacity; MailboxOutgoingSession(DatabaseComponent db, @@ -51,11 +51,11 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { long maxLatency, StreamWriter streamWriter, SyncRecordWriter recordWriter, - DeferredSendHandler deferredSendHandler, + OutgoingSessionRecord sessionRecord, long capacity) { super(db, eventBus, contactId, transportId, maxLatency, streamWriter, recordWriter); - this.deferredSendHandler = deferredSendHandler; + this.sessionRecord = sessionRecord; this.initialCapacity = capacity; } @@ -65,7 +65,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { Collection idsToAck = loadMessageIdsToAck(); if (idsToAck.isEmpty()) break; recordWriter.writeAck(new Ack(idsToAck)); - deferredSendHandler.onAckSent(idsToAck); + sessionRecord.onAckSent(idsToAck); LOG.info("Sent ack"); } } @@ -96,7 +96,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { db.getMessageToSend(txn, contactId, m, maxLatency, false)); if (message == null) continue; // No longer shared recordWriter.writeMessage(message); - deferredSendHandler.onMessageSent(m); + sessionRecord.onMessageSent(m); LOG.info("Sent message"); } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java index 904882847..814599358 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java @@ -6,10 +6,10 @@ import org.briarproject.bramble.api.db.Transaction; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.sync.Ack; -import org.briarproject.bramble.api.sync.DeferredSendHandler; import org.briarproject.bramble.api.sync.GroupId; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.transport.StreamWriter; @@ -30,6 +30,7 @@ import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getMessage; import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.briarproject.bramble.test.TestUtils.getTransportId; +import static org.junit.Assert.assertEquals; public class MailboxOutgoingSessionTest extends BrambleMockTestCase { @@ -40,8 +41,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { private final StreamWriter streamWriter = context.mock(StreamWriter.class); private final SyncRecordWriter recordWriter = context.mock(SyncRecordWriter.class); - private final DeferredSendHandler deferredSendHandler = - context.mock(DeferredSendHandler.class); private final ContactId contactId = getContactId(); private final TransportId transportId = getTransportId(); @@ -53,9 +52,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { @Test public void testNothingToSend() throws Exception { + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); MailboxOutgoingSession session = new MailboxOutgoingSession(db, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter, deferredSendHandler, + streamWriter, recordWriter, sessionRecord, MAX_FILE_PAYLOAD_BYTES); Transaction noAckIdTxn = new Transaction(null, true); @@ -92,13 +92,17 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { }}); session.run(); + + assertEquals(emptyList(), sessionRecord.getAckedIds()); + assertEquals(emptyList(), sessionRecord.getSentIds()); } @Test public void testSomethingToSend() throws Exception { + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); MailboxOutgoingSession session = new MailboxOutgoingSession(db, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter, deferredSendHandler, + streamWriter, recordWriter, sessionRecord, MAX_FILE_PAYLOAD_BYTES); Transaction ackIdTxn = new Transaction(null, true); @@ -127,8 +131,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes)); oneOf(recordWriter).writeAck(with(any(Ack.class))); - oneOf(deferredSendHandler) - .onAckSent(singletonList(message.getId())); // No more messages to ack oneOf(db).transactionWithResult(with(true), withDbCallable(noAckIdTxn)); @@ -150,7 +152,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { MAX_LATENCY, false); will(returnValue(message1)); oneOf(recordWriter).writeMessage(message1); - oneOf(deferredSendHandler).onMessageSent(message1.getId()); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); // Remove listener @@ -158,6 +159,11 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { }}); session.run(); + + assertEquals(singletonList(message.getId()), + sessionRecord.getAckedIds()); + assertEquals(singletonList(message1.getId()), + sessionRecord.getSentIds()); } @Test @@ -167,9 +173,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { long capacity = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS + RECORD_HEADER_BYTES + MessageId.LENGTH + MessageId.LENGTH - 1; + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); MailboxOutgoingSession session = new MailboxOutgoingSession(db, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter, deferredSendHandler, capacity); + streamWriter, recordWriter, sessionRecord, capacity); Transaction ackIdTxn1 = new Transaction(null, true); Transaction ackIdTxn2 = new Transaction(null, true); @@ -184,6 +191,9 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { } List idsInSecondAck = singletonList(new MessageId(getRandomId())); + List allIds = new ArrayList<>(MAX_MESSAGE_IDS + 1); + allIds.addAll(idsInFirstAck); + allIds.addAll(idsInSecondAck); context.checking(new DbExpectations() {{ // Add listener @@ -200,7 +210,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { will(returnValue(idsInFirstAck)); // Send the first ack record oneOf(recordWriter).writeAck(with(any(Ack.class))); - oneOf(deferredSendHandler).onAckSent(idsInFirstAck); // Calculate remaining capacity for acks oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes + firstAckRecordBytes)); @@ -211,7 +220,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { will(returnValue(idsInSecondAck)); // Send the second ack record oneOf(recordWriter).writeAck(with(any(Ack.class))); - oneOf(deferredSendHandler).onAckSent(idsInSecondAck); // Not enough capacity left for another ack oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes + firstAckRecordBytes @@ -227,5 +235,8 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { }}); session.run(); + + assertEquals(allIds, sessionRecord.getAckedIds()); + assertEquals(emptyList(), sessionRecord.getSentIds()); } }