Update SimplexOutgoingSession to support sending unacked messages.

This commit is contained in:
akwizgran
2021-06-08 12:10:52 +01:00
parent 9a58b37ce2
commit 77a3199aac
3 changed files with 98 additions and 5 deletions

View File

@@ -15,6 +15,7 @@ import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions;
@@ -22,7 +23,11 @@ import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
@@ -61,6 +66,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency;
private final boolean eager;
private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter;
private final AtomicInteger outstandingQueries;
@@ -70,7 +76,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId, TransportId transportId,
int maxLatency, StreamWriter streamWriter,
int maxLatency, boolean eager, StreamWriter streamWriter,
SyncRecordWriter recordWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
@@ -78,6 +84,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency;
this.eager = eager;
this.streamWriter = streamWriter;
this.recordWriter = recordWriter;
outstandingQueries = new AtomicInteger(2); // One per type of record
@@ -93,7 +100,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS));
// Start a query for each type of record
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
if (eager) dbExecutor.execute(new LoadUnackedMessageIds());
else dbExecutor.execute(new GenerateBatch());
// Write records until interrupted or no more records to write
try {
while (!interrupted) {
@@ -138,6 +146,91 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
}
}
private class LoadUnackedMessageIds implements Runnable {
@DatabaseExecutor
@Override
public void run() {
if (interrupted) return;
try {
Map<MessageId, Integer> ids =
db.transactionWithResult(true, txn ->
db.getUnackedMessagesToSend(txn, contactId));
if (LOG.isLoggable(INFO)) {
LOG.info(ids.size() + " unacked messages to send");
}
if (ids.isEmpty()) decrementOutstandingQueries();
else dbExecutor.execute(new GenerateEagerBatch(ids));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
}
}
}
private class GenerateEagerBatch implements Runnable {
private final Map<MessageId, Integer> ids;
private GenerateEagerBatch(Map<MessageId, Integer> ids) {
this.ids = ids;
}
@DatabaseExecutor
@Override
public void run() {
if (interrupted) return;
// Take some message IDs from `ids` to form a batch
Collection<MessageId> batchIds = new ArrayList<>();
long totalLength = 0;
Iterator<Entry<MessageId, Integer>> it =
ids.entrySet().iterator();
while (it.hasNext()) {
// Check whether the next message will fit in the batch
Entry<MessageId, Integer> e = it.next();
int length = e.getValue();
if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break;
// Add the message to the batch
it.remove();
batchIds.add(e.getKey());
totalLength += length;
}
if (batchIds.isEmpty()) throw new AssertionError();
try {
Collection<Message> batch =
db.transactionWithResult(false, txn ->
db.generateBatch(txn, contactId, batchIds,
maxLatency));
writerTasks.add(new WriteEagerBatch(batch, ids));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
}
}
}
private class WriteEagerBatch implements ThrowingRunnable<IOException> {
private final Collection<Message> batch;
private final Map<MessageId, Integer> ids;
private WriteEagerBatch(Collection<Message> batch,
Map<MessageId, Integer> ids) {
this.batch = batch;
this.ids = ids;
}
@IoExecutor
@Override
public void run() throws IOException {
if (interrupted) return;
for (Message m : batch) recordWriter.writeMessage(m);
LOG.info("Sent eager batch");
if (ids.isEmpty()) decrementOutstandingQueries();
else dbExecutor.execute(new GenerateEagerBatch(ids));
}
}
private class GenerateAck implements Runnable {
@DatabaseExecutor

View File

@@ -65,7 +65,7 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
maxLatency, streamWriter, recordWriter);
maxLatency, false, streamWriter, recordWriter);
}
@Override

View File

@@ -46,7 +46,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
public void testNothingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
streamWriter, recordWriter);
false, streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false);
Transaction noMsgTxn = new Transaction(null, false);
@@ -80,7 +80,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
Ack ack = new Ack(singletonList(messageId));
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
streamWriter, recordWriter);
false, streamWriter, recordWriter);
Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false);
Transaction msgTxn = new Transaction(null, false);