From 96228c1fd042481583d31be65c1a656ff98d800d Mon Sep 17 00:00:00 2001 From: akwizgran Date: Wed, 22 Dec 2021 12:58:26 +0000 Subject: [PATCH] Do all of SimplexOutgoingSession's work on the IoExecutor. --- .../bramble/sync/SimplexOutgoingSession.java | 166 ++++++------------ .../bramble/sync/SyncSessionFactoryImpl.java | 4 +- .../sync/SimplexOutgoingSessionTest.java | 21 +-- 3 files changed, 62 insertions(+), 129 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 90c6be0b4..3e6075614 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -3,7 +3,6 @@ package org.briarproject.bramble.sync; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.event.ContactRemovedEvent; import org.briarproject.bramble.api.db.DatabaseComponent; -import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.event.Event; import org.briarproject.bramble.api.event.EventBus; @@ -28,10 +27,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import javax.annotation.concurrent.ThreadSafe; @@ -57,11 +52,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private static final Logger LOG = getLogger(SimplexOutgoingSession.class.getName()); - private static final ThrowingRunnable CLOSE = () -> { - }; - private final DatabaseComponent db; - private final Executor dbExecutor; private final EventBus eventBus; private final ContactId contactId; private final TransportId transportId; @@ -69,17 +60,18 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private final boolean eager; private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; - private final AtomicInteger outstandingQueries; - private final BlockingQueue> writerTasks; private volatile boolean interrupted = false; - SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, - EventBus eventBus, ContactId contactId, TransportId transportId, - long maxLatency, boolean eager, StreamWriter streamWriter, + SimplexOutgoingSession(DatabaseComponent db, + EventBus eventBus, + ContactId contactId, + TransportId transportId, + long maxLatency, + boolean eager, + StreamWriter streamWriter, SyncRecordWriter recordWriter) { this.db = db; - this.dbExecutor = dbExecutor; this.eventBus = eventBus; this.contactId = contactId; this.transportId = transportId; @@ -87,8 +79,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener { this.eager = eager; this.streamWriter = streamWriter; this.recordWriter = recordWriter; - outstandingQueries = new AtomicInteger(2); // One per type of record - writerTasks = new LinkedBlockingQueue<>(); } @IoExecutor @@ -98,22 +88,22 @@ class SimplexOutgoingSession implements SyncSession, EventListener { try { // Send our supported protocol versions recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); - // Start a query for each type of record - dbExecutor.execute(this::generateAck); - if (eager) dbExecutor.execute(this::loadUnackedMessageIds); - else dbExecutor.execute(this::generateBatch); - // Write records until interrupted or no more records to write try { - while (!interrupted) { - ThrowingRunnable task = writerTasks.take(); - if (task == CLOSE) break; - task.run(); + // Send any waiting acks + while (!interrupted) if (!generateAndWriteAck()) break; + // Send any waiting messages + if (eager) { + Map ids = loadUnackedMessageIds(); + while (!interrupted && !ids.isEmpty()) { + generateAndWriteEagerBatch(ids); + } + } else { + while (!interrupted) if (!generateAndWriteBatch()) break; } - streamWriter.sendEndOfStream(); - } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for a record to write"); - Thread.currentThread().interrupt(); + } catch (DbException e) { + logException(LOG, WARNING, e); } + streamWriter.sendEndOfStream(); } finally { eventBus.removeListener(this); } @@ -122,11 +112,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener { @Override public void interrupt() { interrupted = true; - writerTasks.add(CLOSE); - } - - private void decrementOutstandingQueries() { - if (outstandingQueries.decrementAndGet() == 0) writerTasks.add(CLOSE); } @Override @@ -146,26 +131,17 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } } - @DatabaseExecutor - private void loadUnackedMessageIds() { - if (interrupted) return; - try { - Map ids = db.transactionWithResult(true, txn -> - db.getUnackedMessagesToSend(txn, contactId)); - if (LOG.isLoggable(INFO)) { - LOG.info(ids.size() + " unacked messages to send"); - } - if (ids.isEmpty()) decrementOutstandingQueries(); - else dbExecutor.execute(() -> generateEagerBatch(ids)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); + private Map loadUnackedMessageIds() throws DbException { + Map ids = db.transactionWithResult(true, txn -> + db.getUnackedMessagesToSend(txn, contactId)); + if (LOG.isLoggable(INFO)) { + LOG.info(ids.size() + " unacked messages to send"); } + return ids; } - @DatabaseExecutor - private void generateEagerBatch(Map ids) { - if (interrupted) return; + private void generateAndWriteEagerBatch(Map ids) + throws DbException, IOException { // Take some message IDs from `ids` to form a batch Collection batchIds = new ArrayList<>(); long totalLength = 0; @@ -181,75 +157,35 @@ class SimplexOutgoingSession implements SyncSession, EventListener { totalLength += length; } if (batchIds.isEmpty()) throw new AssertionError(); - try { - Collection batch = - db.transactionWithResult(false, txn -> - db.generateBatch(txn, contactId, batchIds, - maxLatency)); - writerTasks.add(() -> writeEagerBatch(batch, ids)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); + Collection b = db.transactionWithResult(false, txn -> + db.generateBatch(txn, contactId, batchIds, maxLatency)); + // The batch may be empty if some of the messages are no longer shared + if (!b.isEmpty()) { + for (Message m : b) recordWriter.writeMessage(m); + LOG.info("Sent eager batch"); } } - @IoExecutor - private void writeEagerBatch(Collection batch, - Map ids) throws IOException { - if (interrupted) return; - for (Message m : batch) recordWriter.writeMessage(m); - LOG.info("Sent eager batch"); - if (ids.isEmpty()) decrementOutstandingQueries(); - else dbExecutor.execute(() -> generateEagerBatch(ids)); - } - - @DatabaseExecutor - private void generateAck() { - if (interrupted) return; - try { - Ack a = db.transactionWithNullableResult(false, txn -> - db.generateAck(txn, contactId, MAX_MESSAGE_IDS)); - if (LOG.isLoggable(INFO)) - LOG.info("Generated ack: " + (a != null)); - if (a == null) decrementOutstandingQueries(); - else writerTasks.add(() -> writeAck(a)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); - } - } - - @IoExecutor - private void writeAck(Ack ack) throws IOException { - if (interrupted) return; - recordWriter.writeAck(ack); + private boolean generateAndWriteAck() throws DbException, IOException { + Ack a = db.transactionWithNullableResult(false, txn -> + db.generateAck(txn, contactId, MAX_MESSAGE_IDS)); + if (LOG.isLoggable(INFO)) + LOG.info("Generated ack: " + (a != null)); + if (a == null) return false; + recordWriter.writeAck(a); LOG.info("Sent ack"); - dbExecutor.execute(this::generateAck); + return true; } - @DatabaseExecutor - private void generateBatch() { - if (interrupted) return; - try { - Collection b = - db.transactionWithNullableResult(false, txn -> - db.generateBatch(txn, contactId, - MAX_RECORD_PAYLOAD_BYTES, maxLatency)); - if (LOG.isLoggable(INFO)) - LOG.info("Generated batch: " + (b != null)); - if (b == null) decrementOutstandingQueries(); - else writerTasks.add(() -> writeBatch(b)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); - } - } - - @IoExecutor - private void writeBatch(Collection batch) throws IOException { - if (interrupted) return; - for (Message m : batch) recordWriter.writeMessage(m); + private boolean generateAndWriteBatch() throws DbException, IOException { + Collection b = db.transactionWithNullableResult(false, txn -> + db.generateBatch(txn, contactId, + MAX_RECORD_PAYLOAD_BYTES, maxLatency)); + if (LOG.isLoggable(INFO)) + LOG.info("Generated batch: " + (b != null)); + if (b == null) return false; + for (Message m : b) recordWriter.writeMessage(m); LOG.info("Sent batch"); - dbExecutor.execute(this::generateBatch); + return true; } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index 70fd0df8b..38aa6daa7 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -64,8 +64,8 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); - return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t, - maxLatency, eager, streamWriter, recordWriter); + return new SimplexOutgoingSession(db, eventBus, c, t, maxLatency, + eager, streamWriter, recordWriter); } @Override diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index 9bcf7cc3f..bd4bc35f0 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -14,12 +14,10 @@ import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.DbExpectations; -import org.briarproject.bramble.test.ImmediateExecutor; import org.junit.Test; import java.util.LinkedHashMap; import java.util.Map; -import java.util.concurrent.Executor; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; @@ -41,7 +39,6 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { private final SyncRecordWriter recordWriter = context.mock(SyncRecordWriter.class); - private final Executor dbExecutor = new ImmediateExecutor(); private final ContactId contactId = getContactId(); private final TransportId transportId = getTransportId(); private final Ack ack = @@ -54,7 +51,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { @Test public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + eventBus, contactId, transportId, MAX_LATENCY, false, streamWriter, recordWriter); Transaction noAckTxn = new Transaction(null, false); @@ -88,7 +85,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { @Test public void testNothingToSendEagerly() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + eventBus, contactId, transportId, MAX_LATENCY, true, streamWriter, recordWriter); Transaction noAckTxn = new Transaction(null, false); @@ -121,7 +118,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { @Test public void testSomethingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + eventBus, contactId, transportId, MAX_LATENCY, false, streamWriter, recordWriter); Transaction ackTxn = new Transaction(null, false); @@ -140,6 +137,11 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS); will(returnValue(ack)); oneOf(recordWriter).writeAck(ack); + // No more acks + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); // One message to send oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(msgTxn)); @@ -147,11 +149,6 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(singletonList(message))); oneOf(recordWriter).writeMessage(message); - // No more acks - oneOf(db).transactionWithNullableResult(with(false), - withNullableDbCallable(noAckTxn)); - oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); - will(returnValue(null)); // No more messages oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); @@ -170,7 +167,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { @Test public void testSomethingToSendEagerly() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + eventBus, contactId, transportId, MAX_LATENCY, true, streamWriter, recordWriter); Map unacked = new LinkedHashMap<>();