mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-19 22:29:53 +01:00
Replace DeferredSendHandler with OutgoingSessionRecord.
This commit is contained in:
@@ -1,15 +0,0 @@
|
|||||||
package org.briarproject.bramble.api.sync;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An interface for holding the IDs of messages sent and acked during an
|
|
||||||
* outgoing {@link SyncSession} so they can be recorded in the DB as sent
|
|
||||||
* or acked at some later time.
|
|
||||||
*/
|
|
||||||
public interface DeferredSendHandler {
|
|
||||||
|
|
||||||
void onAckSent(Collection<MessageId> acked);
|
|
||||||
|
|
||||||
void onMessageSent(MessageId sent);
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
package org.briarproject.bramble.api.sync;
|
||||||
|
|
||||||
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A container for holding the IDs of messages sent and acked during an
|
||||||
|
* outgoing {@link SyncSession}, so they can be recorded in the DB as sent
|
||||||
|
* or acked at some later time.
|
||||||
|
*/
|
||||||
|
@ThreadSafe
|
||||||
|
@NotNullByDefault
|
||||||
|
public class OutgoingSessionRecord {
|
||||||
|
|
||||||
|
private final Collection<MessageId> ackedIds = new CopyOnWriteArrayList<>();
|
||||||
|
private final Collection<MessageId> sentIds = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
public void onAckSent(Collection<MessageId> acked) {
|
||||||
|
ackedIds.addAll(acked);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onMessageSent(MessageId sent) {
|
||||||
|
sentIds.add(sent);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Collection<MessageId> getAckedIds() {
|
||||||
|
return ackedIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Collection<MessageId> getSentIds() {
|
||||||
|
return sentIds;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,9 +7,9 @@ import org.briarproject.bramble.api.event.EventBus;
|
|||||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
import org.briarproject.bramble.api.plugin.TransportId;
|
import org.briarproject.bramble.api.plugin.TransportId;
|
||||||
import org.briarproject.bramble.api.sync.Ack;
|
import org.briarproject.bramble.api.sync.Ack;
|
||||||
import org.briarproject.bramble.api.sync.DeferredSendHandler;
|
|
||||||
import org.briarproject.bramble.api.sync.Message;
|
import org.briarproject.bramble.api.sync.Message;
|
||||||
import org.briarproject.bramble.api.sync.MessageId;
|
import org.briarproject.bramble.api.sync.MessageId;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||||
|
|
||||||
@@ -29,7 +29,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LEN
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link SimplexOutgoingSession} for sending and acking messages via a
|
* A {@link SimplexOutgoingSession} for sending and acking messages via a
|
||||||
* mailbox. The session uses a {@link DeferredSendHandler} to record the IDs
|
* mailbox. The session uses a {@link OutgoingSessionRecord} to record the IDs
|
||||||
* of the messages sent and acked during the session so that they can be
|
* of the messages sent and acked during the session so that they can be
|
||||||
* recorded in the DB as sent or acked after the file has been successfully
|
* recorded in the DB as sent or acked after the file has been successfully
|
||||||
* uploaded to the mailbox.
|
* uploaded to the mailbox.
|
||||||
@@ -41,7 +41,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
|
|||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
getLogger(MailboxOutgoingSession.class.getName());
|
getLogger(MailboxOutgoingSession.class.getName());
|
||||||
|
|
||||||
private final DeferredSendHandler deferredSendHandler;
|
private final OutgoingSessionRecord sessionRecord;
|
||||||
private final long initialCapacity;
|
private final long initialCapacity;
|
||||||
|
|
||||||
MailboxOutgoingSession(DatabaseComponent db,
|
MailboxOutgoingSession(DatabaseComponent db,
|
||||||
@@ -51,11 +51,11 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
|
|||||||
long maxLatency,
|
long maxLatency,
|
||||||
StreamWriter streamWriter,
|
StreamWriter streamWriter,
|
||||||
SyncRecordWriter recordWriter,
|
SyncRecordWriter recordWriter,
|
||||||
DeferredSendHandler deferredSendHandler,
|
OutgoingSessionRecord sessionRecord,
|
||||||
long capacity) {
|
long capacity) {
|
||||||
super(db, eventBus, contactId, transportId, maxLatency, streamWriter,
|
super(db, eventBus, contactId, transportId, maxLatency, streamWriter,
|
||||||
recordWriter);
|
recordWriter);
|
||||||
this.deferredSendHandler = deferredSendHandler;
|
this.sessionRecord = sessionRecord;
|
||||||
this.initialCapacity = capacity;
|
this.initialCapacity = capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,7 +65,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
|
|||||||
Collection<MessageId> idsToAck = loadMessageIdsToAck();
|
Collection<MessageId> idsToAck = loadMessageIdsToAck();
|
||||||
if (idsToAck.isEmpty()) break;
|
if (idsToAck.isEmpty()) break;
|
||||||
recordWriter.writeAck(new Ack(idsToAck));
|
recordWriter.writeAck(new Ack(idsToAck));
|
||||||
deferredSendHandler.onAckSent(idsToAck);
|
sessionRecord.onAckSent(idsToAck);
|
||||||
LOG.info("Sent ack");
|
LOG.info("Sent ack");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -96,7 +96,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
|
|||||||
db.getMessageToSend(txn, contactId, m, maxLatency, false));
|
db.getMessageToSend(txn, contactId, m, maxLatency, false));
|
||||||
if (message == null) continue; // No longer shared
|
if (message == null) continue; // No longer shared
|
||||||
recordWriter.writeMessage(message);
|
recordWriter.writeMessage(message);
|
||||||
deferredSendHandler.onMessageSent(m);
|
sessionRecord.onMessageSent(m);
|
||||||
LOG.info("Sent message");
|
LOG.info("Sent message");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,10 +6,10 @@ import org.briarproject.bramble.api.db.Transaction;
|
|||||||
import org.briarproject.bramble.api.event.EventBus;
|
import org.briarproject.bramble.api.event.EventBus;
|
||||||
import org.briarproject.bramble.api.plugin.TransportId;
|
import org.briarproject.bramble.api.plugin.TransportId;
|
||||||
import org.briarproject.bramble.api.sync.Ack;
|
import org.briarproject.bramble.api.sync.Ack;
|
||||||
import org.briarproject.bramble.api.sync.DeferredSendHandler;
|
|
||||||
import org.briarproject.bramble.api.sync.GroupId;
|
import org.briarproject.bramble.api.sync.GroupId;
|
||||||
import org.briarproject.bramble.api.sync.Message;
|
import org.briarproject.bramble.api.sync.Message;
|
||||||
import org.briarproject.bramble.api.sync.MessageId;
|
import org.briarproject.bramble.api.sync.MessageId;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||||
import org.briarproject.bramble.api.sync.Versions;
|
import org.briarproject.bramble.api.sync.Versions;
|
||||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||||
@@ -30,6 +30,7 @@ import static org.briarproject.bramble.test.TestUtils.getContactId;
|
|||||||
import static org.briarproject.bramble.test.TestUtils.getMessage;
|
import static org.briarproject.bramble.test.TestUtils.getMessage;
|
||||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||||
import static org.briarproject.bramble.test.TestUtils.getTransportId;
|
import static org.briarproject.bramble.test.TestUtils.getTransportId;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||||
|
|
||||||
@@ -40,8 +41,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
private final StreamWriter streamWriter = context.mock(StreamWriter.class);
|
private final StreamWriter streamWriter = context.mock(StreamWriter.class);
|
||||||
private final SyncRecordWriter recordWriter =
|
private final SyncRecordWriter recordWriter =
|
||||||
context.mock(SyncRecordWriter.class);
|
context.mock(SyncRecordWriter.class);
|
||||||
private final DeferredSendHandler deferredSendHandler =
|
|
||||||
context.mock(DeferredSendHandler.class);
|
|
||||||
|
|
||||||
private final ContactId contactId = getContactId();
|
private final ContactId contactId = getContactId();
|
||||||
private final TransportId transportId = getTransportId();
|
private final TransportId transportId = getTransportId();
|
||||||
@@ -53,9 +52,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNothingToSend() throws Exception {
|
public void testNothingToSend() throws Exception {
|
||||||
|
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||||
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
||||||
eventBus, contactId, transportId, MAX_LATENCY,
|
eventBus, contactId, transportId, MAX_LATENCY,
|
||||||
streamWriter, recordWriter, deferredSendHandler,
|
streamWriter, recordWriter, sessionRecord,
|
||||||
MAX_FILE_PAYLOAD_BYTES);
|
MAX_FILE_PAYLOAD_BYTES);
|
||||||
|
|
||||||
Transaction noAckIdTxn = new Transaction(null, true);
|
Transaction noAckIdTxn = new Transaction(null, true);
|
||||||
@@ -92,13 +92,17 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
}});
|
}});
|
||||||
|
|
||||||
session.run();
|
session.run();
|
||||||
|
|
||||||
|
assertEquals(emptyList(), sessionRecord.getAckedIds());
|
||||||
|
assertEquals(emptyList(), sessionRecord.getSentIds());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSomethingToSend() throws Exception {
|
public void testSomethingToSend() throws Exception {
|
||||||
|
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||||
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
||||||
eventBus, contactId, transportId, MAX_LATENCY,
|
eventBus, contactId, transportId, MAX_LATENCY,
|
||||||
streamWriter, recordWriter, deferredSendHandler,
|
streamWriter, recordWriter, sessionRecord,
|
||||||
MAX_FILE_PAYLOAD_BYTES);
|
MAX_FILE_PAYLOAD_BYTES);
|
||||||
|
|
||||||
Transaction ackIdTxn = new Transaction(null, true);
|
Transaction ackIdTxn = new Transaction(null, true);
|
||||||
@@ -127,8 +131,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
oneOf(recordWriter).getBytesWritten();
|
oneOf(recordWriter).getBytesWritten();
|
||||||
will(returnValue((long) versionRecordBytes));
|
will(returnValue((long) versionRecordBytes));
|
||||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||||
oneOf(deferredSendHandler)
|
|
||||||
.onAckSent(singletonList(message.getId()));
|
|
||||||
// No more messages to ack
|
// No more messages to ack
|
||||||
oneOf(db).transactionWithResult(with(true),
|
oneOf(db).transactionWithResult(with(true),
|
||||||
withDbCallable(noAckIdTxn));
|
withDbCallable(noAckIdTxn));
|
||||||
@@ -150,7 +152,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
MAX_LATENCY, false);
|
MAX_LATENCY, false);
|
||||||
will(returnValue(message1));
|
will(returnValue(message1));
|
||||||
oneOf(recordWriter).writeMessage(message1);
|
oneOf(recordWriter).writeMessage(message1);
|
||||||
oneOf(deferredSendHandler).onMessageSent(message1.getId());
|
|
||||||
// Send the end of stream marker
|
// Send the end of stream marker
|
||||||
oneOf(streamWriter).sendEndOfStream();
|
oneOf(streamWriter).sendEndOfStream();
|
||||||
// Remove listener
|
// Remove listener
|
||||||
@@ -158,6 +159,11 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
}});
|
}});
|
||||||
|
|
||||||
session.run();
|
session.run();
|
||||||
|
|
||||||
|
assertEquals(singletonList(message.getId()),
|
||||||
|
sessionRecord.getAckedIds());
|
||||||
|
assertEquals(singletonList(message1.getId()),
|
||||||
|
sessionRecord.getSentIds());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -167,9 +173,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
long capacity = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS
|
long capacity = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS
|
||||||
+ RECORD_HEADER_BYTES + MessageId.LENGTH + MessageId.LENGTH - 1;
|
+ RECORD_HEADER_BYTES + MessageId.LENGTH + MessageId.LENGTH - 1;
|
||||||
|
|
||||||
|
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||||
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
||||||
eventBus, contactId, transportId, MAX_LATENCY,
|
eventBus, contactId, transportId, MAX_LATENCY,
|
||||||
streamWriter, recordWriter, deferredSendHandler, capacity);
|
streamWriter, recordWriter, sessionRecord, capacity);
|
||||||
|
|
||||||
Transaction ackIdTxn1 = new Transaction(null, true);
|
Transaction ackIdTxn1 = new Transaction(null, true);
|
||||||
Transaction ackIdTxn2 = new Transaction(null, true);
|
Transaction ackIdTxn2 = new Transaction(null, true);
|
||||||
@@ -184,6 +191,9 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
}
|
}
|
||||||
List<MessageId> idsInSecondAck =
|
List<MessageId> idsInSecondAck =
|
||||||
singletonList(new MessageId(getRandomId()));
|
singletonList(new MessageId(getRandomId()));
|
||||||
|
List<MessageId> allIds = new ArrayList<>(MAX_MESSAGE_IDS + 1);
|
||||||
|
allIds.addAll(idsInFirstAck);
|
||||||
|
allIds.addAll(idsInSecondAck);
|
||||||
|
|
||||||
context.checking(new DbExpectations() {{
|
context.checking(new DbExpectations() {{
|
||||||
// Add listener
|
// Add listener
|
||||||
@@ -200,7 +210,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
will(returnValue(idsInFirstAck));
|
will(returnValue(idsInFirstAck));
|
||||||
// Send the first ack record
|
// Send the first ack record
|
||||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||||
oneOf(deferredSendHandler).onAckSent(idsInFirstAck);
|
|
||||||
// Calculate remaining capacity for acks
|
// Calculate remaining capacity for acks
|
||||||
oneOf(recordWriter).getBytesWritten();
|
oneOf(recordWriter).getBytesWritten();
|
||||||
will(returnValue((long) versionRecordBytes + firstAckRecordBytes));
|
will(returnValue((long) versionRecordBytes + firstAckRecordBytes));
|
||||||
@@ -211,7 +220,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
will(returnValue(idsInSecondAck));
|
will(returnValue(idsInSecondAck));
|
||||||
// Send the second ack record
|
// Send the second ack record
|
||||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||||
oneOf(deferredSendHandler).onAckSent(idsInSecondAck);
|
|
||||||
// Not enough capacity left for another ack
|
// Not enough capacity left for another ack
|
||||||
oneOf(recordWriter).getBytesWritten();
|
oneOf(recordWriter).getBytesWritten();
|
||||||
will(returnValue((long) versionRecordBytes + firstAckRecordBytes
|
will(returnValue((long) versionRecordBytes + firstAckRecordBytes
|
||||||
@@ -227,5 +235,8 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
|||||||
}});
|
}});
|
||||||
|
|
||||||
session.run();
|
session.run();
|
||||||
|
|
||||||
|
assertEquals(allIds, sessionRecord.getAckedIds());
|
||||||
|
assertEquals(emptyList(), sessionRecord.getSentIds());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user