From 77a3199aac5f87c0748826ac9a057d4cf82a596f Mon Sep 17 00:00:00 2001 From: akwizgran Date: Tue, 8 Jun 2021 12:10:52 +0100 Subject: [PATCH] Update SimplexOutgoingSession to support sending unacked messages. --- .../bramble/sync/SimplexOutgoingSession.java | 97 ++++++++++++++++++- .../bramble/sync/SyncSessionFactoryImpl.java | 2 +- .../sync/SimplexOutgoingSessionTest.java | 4 +- 3 files changed, 98 insertions(+), 5 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 1d32ca4ee..d069d38b0 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -15,6 +15,7 @@ import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; +import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.Versions; @@ -22,7 +23,11 @@ import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent; import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -61,6 +66,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private final ContactId contactId; private final TransportId transportId; private final int maxLatency; + private final boolean eager; private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; private final AtomicInteger outstandingQueries; @@ -70,7 +76,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, EventBus eventBus, ContactId contactId, TransportId transportId, - int maxLatency, StreamWriter streamWriter, + int maxLatency, boolean eager, StreamWriter streamWriter, SyncRecordWriter recordWriter) { this.db = db; this.dbExecutor = dbExecutor; @@ -78,6 +84,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { this.contactId = contactId; this.transportId = transportId; this.maxLatency = maxLatency; + this.eager = eager; this.streamWriter = streamWriter; this.recordWriter = recordWriter; outstandingQueries = new AtomicInteger(2); // One per type of record @@ -93,7 +100,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener { recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); // Start a query for each type of record dbExecutor.execute(new GenerateAck()); - dbExecutor.execute(new GenerateBatch()); + if (eager) dbExecutor.execute(new LoadUnackedMessageIds()); + else dbExecutor.execute(new GenerateBatch()); // Write records until interrupted or no more records to write try { while (!interrupted) { @@ -138,6 +146,91 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } } + private class LoadUnackedMessageIds implements Runnable { + + @DatabaseExecutor + @Override + public void run() { + if (interrupted) return; + try { + Map ids = + db.transactionWithResult(true, txn -> + db.getUnackedMessagesToSend(txn, contactId)); + if (LOG.isLoggable(INFO)) { + LOG.info(ids.size() + " unacked messages to send"); + } + if (ids.isEmpty()) decrementOutstandingQueries(); + else dbExecutor.execute(new GenerateEagerBatch(ids)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); + } + } + } + + private class GenerateEagerBatch implements Runnable { + + private final Map ids; + + private GenerateEagerBatch(Map ids) { + this.ids = ids; + } + + @DatabaseExecutor + @Override + public void run() { + if (interrupted) return; + // Take some message IDs from `ids` to form a batch + Collection batchIds = new ArrayList<>(); + long totalLength = 0; + Iterator> it = + ids.entrySet().iterator(); + while (it.hasNext()) { + // Check whether the next message will fit in the batch + Entry e = it.next(); + int length = e.getValue(); + if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break; + // Add the message to the batch + it.remove(); + batchIds.add(e.getKey()); + totalLength += length; + } + if (batchIds.isEmpty()) throw new AssertionError(); + try { + Collection batch = + db.transactionWithResult(false, txn -> + db.generateBatch(txn, contactId, batchIds, + maxLatency)); + writerTasks.add(new WriteEagerBatch(batch, ids)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); + } + } + } + + private class WriteEagerBatch implements ThrowingRunnable { + + private final Collection batch; + private final Map ids; + + private WriteEagerBatch(Collection batch, + Map ids) { + this.batch = batch; + this.ids = ids; + } + + @IoExecutor + @Override + public void run() throws IOException { + if (interrupted) return; + for (Message m : batch) recordWriter.writeMessage(m); + LOG.info("Sent eager batch"); + if (ids.isEmpty()) decrementOutstandingQueries(); + else dbExecutor.execute(new GenerateEagerBatch(ids)); + } + } + private class GenerateAck implements Runnable { @DatabaseExecutor diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index 4c590df73..74ec3e51a 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -65,7 +65,7 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t, - maxLatency, streamWriter, recordWriter); + maxLatency, false, streamWriter, recordWriter); } @Override diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index df8dc7b87..0dd7baf2f 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -46,7 +46,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter); + false, streamWriter, recordWriter); Transaction noAckTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false); @@ -80,7 +80,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { Ack ack = new Ack(singletonList(messageId)); SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter); + false, streamWriter, recordWriter); Transaction ackTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false);