Replace inner classes with lambdas.

This commit is contained in:
akwizgran
2021-06-10 17:04:15 +01:00
committed by Torsten Grote
parent 9ac72296c7
commit 8be274dc4d

View File

@@ -99,9 +99,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
// Send our supported protocol versions // Send our supported protocol versions
recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS));
// Start a query for each type of record // Start a query for each type of record
dbExecutor.execute(new GenerateAck()); dbExecutor.execute(this::generateAck);
if (eager) dbExecutor.execute(new LoadUnackedMessageIds()); if (eager) dbExecutor.execute(this::loadUnackedMessageIds);
else dbExecutor.execute(new GenerateBatch()); else dbExecutor.execute(this::generateBatch);
// Write records until interrupted or no more records to write // Write records until interrupted or no more records to write
try { try {
while (!interrupted) { while (!interrupted) {
@@ -146,45 +146,30 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
} }
} }
private class LoadUnackedMessageIds implements Runnable {
@DatabaseExecutor @DatabaseExecutor
@Override private void loadUnackedMessageIds() {
public void run() {
if (interrupted) return; if (interrupted) return;
try { try {
Map<MessageId, Integer> ids = Map<MessageId, Integer> ids = db.transactionWithResult(true, txn ->
db.transactionWithResult(true, txn ->
db.getUnackedMessagesToSend(txn, contactId)); db.getUnackedMessagesToSend(txn, contactId));
if (LOG.isLoggable(INFO)) { if (LOG.isLoggable(INFO)) {
LOG.info(ids.size() + " unacked messages to send"); LOG.info(ids.size() + " unacked messages to send");
} }
if (ids.isEmpty()) decrementOutstandingQueries(); if (ids.isEmpty()) decrementOutstandingQueries();
else dbExecutor.execute(new GenerateEagerBatch(ids)); else dbExecutor.execute(() -> generateEagerBatch(ids));
} catch (DbException e) { } catch (DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
interrupt(); interrupt();
} }
} }
}
private class GenerateEagerBatch implements Runnable {
private final Map<MessageId, Integer> ids;
private GenerateEagerBatch(Map<MessageId, Integer> ids) {
this.ids = ids;
}
@DatabaseExecutor @DatabaseExecutor
@Override private void generateEagerBatch(Map<MessageId, Integer> ids) {
public void run() {
if (interrupted) return; if (interrupted) return;
// Take some message IDs from `ids` to form a batch // Take some message IDs from `ids` to form a batch
Collection<MessageId> batchIds = new ArrayList<>(); Collection<MessageId> batchIds = new ArrayList<>();
long totalLength = 0; long totalLength = 0;
Iterator<Entry<MessageId, Integer>> it = Iterator<Entry<MessageId, Integer>> it = ids.entrySet().iterator();
ids.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
// Check whether the next message will fit in the batch // Check whether the next message will fit in the batch
Entry<MessageId, Integer> e = it.next(); Entry<MessageId, Integer> e = it.next();
@@ -201,41 +186,25 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
db.transactionWithResult(false, txn -> db.transactionWithResult(false, txn ->
db.generateBatch(txn, contactId, batchIds, db.generateBatch(txn, contactId, batchIds,
maxLatency)); maxLatency));
writerTasks.add(new WriteEagerBatch(batch, ids)); writerTasks.add(() -> writeEagerBatch(batch, ids));
} catch (DbException e) { } catch (DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
interrupt(); interrupt();
} }
} }
}
private class WriteEagerBatch implements ThrowingRunnable<IOException> {
private final Collection<Message> batch;
private final Map<MessageId, Integer> ids;
private WriteEagerBatch(Collection<Message> batch,
Map<MessageId, Integer> ids) {
this.batch = batch;
this.ids = ids;
}
@IoExecutor @IoExecutor
@Override private void writeEagerBatch(Collection<Message> batch,
public void run() throws IOException { Map<MessageId, Integer> ids) throws IOException {
if (interrupted) return; if (interrupted) return;
for (Message m : batch) recordWriter.writeMessage(m); for (Message m : batch) recordWriter.writeMessage(m);
LOG.info("Sent eager batch"); LOG.info("Sent eager batch");
if (ids.isEmpty()) decrementOutstandingQueries(); if (ids.isEmpty()) decrementOutstandingQueries();
else dbExecutor.execute(new GenerateEagerBatch(ids)); else dbExecutor.execute(() -> generateEagerBatch(ids));
} }
}
private class GenerateAck implements Runnable {
@DatabaseExecutor @DatabaseExecutor
@Override private void generateAck() {
public void run() {
if (interrupted) return; if (interrupted) return;
try { try {
Ack a = db.transactionWithNullableResult(false, txn -> Ack a = db.transactionWithNullableResult(false, txn ->
@@ -243,37 +212,23 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null)); LOG.info("Generated ack: " + (a != null));
if (a == null) decrementOutstandingQueries(); if (a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteAck(a)); else writerTasks.add(() -> writeAck(a));
} catch (DbException e) { } catch (DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
interrupt(); interrupt();
} }
} }
}
private class WriteAck implements ThrowingRunnable<IOException> {
private final Ack ack;
private WriteAck(Ack ack) {
this.ack = ack;
}
@IoExecutor @IoExecutor
@Override private void writeAck(Ack ack) throws IOException {
public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
recordWriter.writeAck(ack); recordWriter.writeAck(ack);
LOG.info("Sent ack"); LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck()); dbExecutor.execute(this::generateAck);
} }
}
private class GenerateBatch implements Runnable {
@DatabaseExecutor @DatabaseExecutor
@Override private void generateBatch() {
public void run() {
if (interrupted) return; if (interrupted) return;
try { try {
Collection<Message> b = Collection<Message> b =
@@ -283,29 +238,18 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
if (LOG.isLoggable(INFO)) if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null)); LOG.info("Generated batch: " + (b != null));
if (b == null) decrementOutstandingQueries(); if (b == null) decrementOutstandingQueries();
else writerTasks.add(new WriteBatch(b)); else writerTasks.add(() -> writeBatch(b));
} catch (DbException e) { } catch (DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
interrupt(); interrupt();
} }
} }
}
private class WriteBatch implements ThrowingRunnable<IOException> {
private final Collection<Message> batch;
private WriteBatch(Collection<Message> batch) {
this.batch = batch;
}
@IoExecutor @IoExecutor
@Override private void writeBatch(Collection<Message> batch) throws IOException {
public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
for (Message m : batch) recordWriter.writeMessage(m); for (Message m : batch) recordWriter.writeMessage(m);
LOG.info("Sent batch"); LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(this::generateBatch);
}
} }
} }