Do all of SimplexOutgoingSession's work on the IoExecutor.

This commit is contained in:
akwizgran
2021-12-22 12:58:26 +00:00
parent 7bd220f18d
commit 96228c1fd0
3 changed files with 62 additions and 129 deletions

View File

@@ -3,7 +3,6 @@ package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.event.ContactRemovedEvent; import org.briarproject.bramble.api.contact.event.ContactRemovedEvent;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.event.Event; import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
@@ -28,10 +27,6 @@ import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
@@ -57,11 +52,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private static final Logger LOG = private static final Logger LOG =
getLogger(SimplexOutgoingSession.class.getName()); getLogger(SimplexOutgoingSession.class.getName());
private static final ThrowingRunnable<IOException> CLOSE = () -> {
};
private final DatabaseComponent db; private final DatabaseComponent db;
private final Executor dbExecutor;
private final EventBus eventBus; private final EventBus eventBus;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
@@ -69,17 +60,18 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private final boolean eager; private final boolean eager;
private final StreamWriter streamWriter; private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter; private final SyncRecordWriter recordWriter;
private final AtomicInteger outstandingQueries;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, SimplexOutgoingSession(DatabaseComponent db,
EventBus eventBus, ContactId contactId, TransportId transportId, EventBus eventBus,
long maxLatency, boolean eager, StreamWriter streamWriter, ContactId contactId,
TransportId transportId,
long maxLatency,
boolean eager,
StreamWriter streamWriter,
SyncRecordWriter recordWriter) { SyncRecordWriter recordWriter) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId; this.transportId = transportId;
@@ -87,8 +79,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
this.eager = eager; this.eager = eager;
this.streamWriter = streamWriter; this.streamWriter = streamWriter;
this.recordWriter = recordWriter; this.recordWriter = recordWriter;
outstandingQueries = new AtomicInteger(2); // One per type of record
writerTasks = new LinkedBlockingQueue<>();
} }
@IoExecutor @IoExecutor
@@ -98,22 +88,22 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
try { try {
// Send our supported protocol versions // Send our supported protocol versions
recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS));
// Start a query for each type of record
dbExecutor.execute(this::generateAck);
if (eager) dbExecutor.execute(this::loadUnackedMessageIds);
else dbExecutor.execute(this::generateBatch);
// Write records until interrupted or no more records to write
try { try {
while (!interrupted) { // Send any waiting acks
ThrowingRunnable<IOException> task = writerTasks.take(); while (!interrupted) if (!generateAndWriteAck()) break;
if (task == CLOSE) break; // Send any waiting messages
task.run(); if (eager) {
Map<MessageId, Integer> ids = loadUnackedMessageIds();
while (!interrupted && !ids.isEmpty()) {
generateAndWriteEagerBatch(ids);
}
} else {
while (!interrupted) if (!generateAndWriteBatch()) break;
} }
streamWriter.sendEndOfStream(); } catch (DbException e) {
} catch (InterruptedException e) { logException(LOG, WARNING, e);
LOG.info("Interrupted while waiting for a record to write");
Thread.currentThread().interrupt();
} }
streamWriter.sendEndOfStream();
} finally { } finally {
eventBus.removeListener(this); eventBus.removeListener(this);
} }
@@ -122,11 +112,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void interrupt() { public void interrupt() {
interrupted = true; interrupted = true;
writerTasks.add(CLOSE);
}
private void decrementOutstandingQueries() {
if (outstandingQueries.decrementAndGet() == 0) writerTasks.add(CLOSE);
} }
@Override @Override
@@ -146,26 +131,17 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
} }
} }
@DatabaseExecutor private Map<MessageId, Integer> loadUnackedMessageIds() throws DbException {
private void loadUnackedMessageIds() { Map<MessageId, Integer> ids = db.transactionWithResult(true, txn ->
if (interrupted) return; db.getUnackedMessagesToSend(txn, contactId));
try { if (LOG.isLoggable(INFO)) {
Map<MessageId, Integer> ids = db.transactionWithResult(true, txn -> LOG.info(ids.size() + " unacked messages to send");
db.getUnackedMessagesToSend(txn, contactId));
if (LOG.isLoggable(INFO)) {
LOG.info(ids.size() + " unacked messages to send");
}
if (ids.isEmpty()) decrementOutstandingQueries();
else dbExecutor.execute(() -> generateEagerBatch(ids));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
} }
return ids;
} }
@DatabaseExecutor private void generateAndWriteEagerBatch(Map<MessageId, Integer> ids)
private void generateEagerBatch(Map<MessageId, Integer> ids) { throws DbException, IOException {
if (interrupted) return;
// Take some message IDs from `ids` to form a batch // Take some message IDs from `ids` to form a batch
Collection<MessageId> batchIds = new ArrayList<>(); Collection<MessageId> batchIds = new ArrayList<>();
long totalLength = 0; long totalLength = 0;
@@ -181,75 +157,35 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
totalLength += length; totalLength += length;
} }
if (batchIds.isEmpty()) throw new AssertionError(); if (batchIds.isEmpty()) throw new AssertionError();
try { Collection<Message> b = db.transactionWithResult(false, txn ->
Collection<Message> batch = db.generateBatch(txn, contactId, batchIds, maxLatency));
db.transactionWithResult(false, txn -> // The batch may be empty if some of the messages are no longer shared
db.generateBatch(txn, contactId, batchIds, if (!b.isEmpty()) {
maxLatency)); for (Message m : b) recordWriter.writeMessage(m);
writerTasks.add(() -> writeEagerBatch(batch, ids)); LOG.info("Sent eager batch");
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
} }
} }
@IoExecutor private boolean generateAndWriteAck() throws DbException, IOException {
private void writeEagerBatch(Collection<Message> batch, Ack a = db.transactionWithNullableResult(false, txn ->
Map<MessageId, Integer> ids) throws IOException { db.generateAck(txn, contactId, MAX_MESSAGE_IDS));
if (interrupted) return; if (LOG.isLoggable(INFO))
for (Message m : batch) recordWriter.writeMessage(m); LOG.info("Generated ack: " + (a != null));
LOG.info("Sent eager batch"); if (a == null) return false;
if (ids.isEmpty()) decrementOutstandingQueries(); recordWriter.writeAck(a);
else dbExecutor.execute(() -> generateEagerBatch(ids));
}
@DatabaseExecutor
private void generateAck() {
if (interrupted) return;
try {
Ack a = db.transactionWithNullableResult(false, txn ->
db.generateAck(txn, contactId, MAX_MESSAGE_IDS));
if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if (a == null) decrementOutstandingQueries();
else writerTasks.add(() -> writeAck(a));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
}
}
@IoExecutor
private void writeAck(Ack ack) throws IOException {
if (interrupted) return;
recordWriter.writeAck(ack);
LOG.info("Sent ack"); LOG.info("Sent ack");
dbExecutor.execute(this::generateAck); return true;
} }
@DatabaseExecutor private boolean generateAndWriteBatch() throws DbException, IOException {
private void generateBatch() { Collection<Message> b = db.transactionWithNullableResult(false, txn ->
if (interrupted) return; db.generateBatch(txn, contactId,
try { MAX_RECORD_PAYLOAD_BYTES, maxLatency));
Collection<Message> b = if (LOG.isLoggable(INFO))
db.transactionWithNullableResult(false, txn -> LOG.info("Generated batch: " + (b != null));
db.generateBatch(txn, contactId, if (b == null) return false;
MAX_RECORD_PAYLOAD_BYTES, maxLatency)); for (Message m : b) recordWriter.writeMessage(m);
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b == null) decrementOutstandingQueries();
else writerTasks.add(() -> writeBatch(b));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
}
}
@IoExecutor
private void writeBatch(Collection<Message> batch) throws IOException {
if (interrupted) return;
for (Message m : batch) recordWriter.writeMessage(m);
LOG.info("Sent batch"); LOG.info("Sent batch");
dbExecutor.execute(this::generateBatch); return true;
} }
} }

View File

@@ -64,8 +64,8 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
OutputStream out = streamWriter.getOutputStream(); OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter = SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out); recordWriterFactory.createRecordWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t, return new SimplexOutgoingSession(db, eventBus, c, t, maxLatency,
maxLatency, eager, streamWriter, recordWriter); eager, streamWriter, recordWriter);
} }
@Override @Override

View File

@@ -14,12 +14,10 @@ import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.BrambleMockTestCase;
import org.briarproject.bramble.test.DbExpectations; import org.briarproject.bramble.test.DbExpectations;
import org.briarproject.bramble.test.ImmediateExecutor;
import org.junit.Test; import org.junit.Test;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
@@ -41,7 +39,6 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
private final SyncRecordWriter recordWriter = private final SyncRecordWriter recordWriter =
context.mock(SyncRecordWriter.class); context.mock(SyncRecordWriter.class);
private final Executor dbExecutor = new ImmediateExecutor();
private final ContactId contactId = getContactId(); private final ContactId contactId = getContactId();
private final TransportId transportId = getTransportId(); private final TransportId transportId = getTransportId();
private final Ack ack = private final Ack ack =
@@ -54,7 +51,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@Test @Test
public void testNothingToSend() throws Exception { public void testNothingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, eventBus, contactId, transportId, MAX_LATENCY,
false, streamWriter, recordWriter); false, streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false);
@@ -88,7 +85,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@Test @Test
public void testNothingToSendEagerly() throws Exception { public void testNothingToSendEagerly() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, eventBus, contactId, transportId, MAX_LATENCY,
true, streamWriter, recordWriter); true, streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false);
@@ -121,7 +118,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@Test @Test
public void testSomethingToSend() throws Exception { public void testSomethingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, eventBus, contactId, transportId, MAX_LATENCY,
false, streamWriter, recordWriter); false, streamWriter, recordWriter);
Transaction ackTxn = new Transaction(null, false); Transaction ackTxn = new Transaction(null, false);
@@ -140,6 +137,11 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS); oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(ack)); will(returnValue(ack));
oneOf(recordWriter).writeAck(ack); oneOf(recordWriter).writeAck(ack);
// No more acks
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noAckTxn));
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(null));
// One message to send // One message to send
oneOf(db).transactionWithNullableResult(with(false), oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(msgTxn)); withNullableDbCallable(msgTxn));
@@ -147,11 +149,6 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
will(returnValue(singletonList(message))); will(returnValue(singletonList(message)));
oneOf(recordWriter).writeMessage(message); oneOf(recordWriter).writeMessage(message);
// No more acks
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noAckTxn));
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(null));
// No more messages // No more messages
oneOf(db).transactionWithNullableResult(with(false), oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noMsgTxn)); withNullableDbCallable(noMsgTxn));
@@ -170,7 +167,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@Test @Test
public void testSomethingToSendEagerly() throws Exception { public void testSomethingToSendEagerly() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, eventBus, contactId, transportId, MAX_LATENCY,
true, streamWriter, recordWriter); true, streamWriter, recordWriter);
Map<MessageId, Integer> unacked = new LinkedHashMap<>(); Map<MessageId, Integer> unacked = new LinkedHashMap<>();