Avoid queueing redundant DB tasks during sync.

This commit is contained in:
akwizgran
2018-01-31 17:26:42 +00:00
parent 347c2f22c1
commit 14a9614c35

View File

@@ -29,6 +29,7 @@ import java.util.Collection;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
@@ -54,7 +55,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName()); Logger.getLogger(DuplexOutgoingSession.class.getName());
private static final ThrowingRunnable<IOException> CLOSE = () -> {}; private static final ThrowingRunnable<IOException> CLOSE = () -> {
};
private final DatabaseComponent db; private final DatabaseComponent db;
private final Executor dbExecutor; private final Executor dbExecutor;
@@ -65,6 +67,12 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final RecordWriter recordWriter; private final RecordWriter recordWriter;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
private final AtomicBoolean generateAckQueued = new AtomicBoolean(false);
private final AtomicBoolean generateBatchQueued = new AtomicBoolean(false);
private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false);
private final AtomicBoolean generateRequestQueued =
new AtomicBoolean(false);
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
@@ -87,10 +95,10 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
eventBus.addListener(this); eventBus.addListener(this);
try { try {
// Start a query for each type of record // Start a query for each type of record
dbExecutor.execute(new GenerateAck()); generateAck();
dbExecutor.execute(new GenerateBatch()); generateBatch();
dbExecutor.execute(new GenerateOffer()); generateOffer();
dbExecutor.execute(new GenerateRequest()); generateRequest();
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
long nextKeepalive = now + maxIdleTime; long nextKeepalive = now + maxIdleTime;
long nextRetxQuery = now + RETX_QUERY_INTERVAL; long nextRetxQuery = now + RETX_QUERY_INTERVAL;
@@ -115,8 +123,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
now = clock.currentTimeMillis(); now = clock.currentTimeMillis();
if (now >= nextRetxQuery) { if (now >= nextRetxQuery) {
// Check for retransmittable records // Check for retransmittable records
dbExecutor.execute(new GenerateBatch()); generateBatch();
dbExecutor.execute(new GenerateOffer()); generateOffer();
nextRetxQuery = now + RETX_QUERY_INTERVAL; nextRetxQuery = now + RETX_QUERY_INTERVAL;
} }
if (now >= nextKeepalive) { if (now >= nextKeepalive) {
@@ -142,6 +150,26 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} }
} }
private void generateAck() {
if (generateAckQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateAck());
}
private void generateBatch() {
if (generateBatchQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateBatch());
}
private void generateOffer() {
if (generateOfferQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateOffer());
}
private void generateRequest() {
if (generateRequestQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateRequest());
}
@Override @Override
public void interrupt() { public void interrupt() {
interrupted = true; interrupted = true;
@@ -154,20 +182,20 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
ContactRemovedEvent c = (ContactRemovedEvent) e; ContactRemovedEvent c = (ContactRemovedEvent) e;
if (c.getContactId().equals(contactId)) interrupt(); if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof MessageSharedEvent) { } else if (e instanceof MessageSharedEvent) {
dbExecutor.execute(new GenerateOffer()); generateOffer();
} else if (e instanceof GroupVisibilityUpdatedEvent) { } else if (e instanceof GroupVisibilityUpdatedEvent) {
GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e; GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e;
if (g.getAffectedContacts().contains(contactId)) if (g.getAffectedContacts().contains(contactId))
dbExecutor.execute(new GenerateOffer()); generateOffer();
} else if (e instanceof MessageRequestedEvent) { } else if (e instanceof MessageRequestedEvent) {
if (((MessageRequestedEvent) e).getContactId().equals(contactId)) if (((MessageRequestedEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateBatch()); generateBatch();
} else if (e instanceof MessageToAckEvent) { } else if (e instanceof MessageToAckEvent) {
if (((MessageToAckEvent) e).getContactId().equals(contactId)) if (((MessageToAckEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateAck()); generateAck();
} else if (e instanceof MessageToRequestEvent) { } else if (e instanceof MessageToRequestEvent) {
if (((MessageToRequestEvent) e).getContactId().equals(contactId)) if (((MessageToRequestEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateRequest()); generateRequest();
} else if (e instanceof ShutdownEvent) { } else if (e instanceof ShutdownEvent) {
interrupt(); interrupt();
} }
@@ -179,6 +207,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() { public void run() {
if (interrupted) return; if (interrupted) return;
if (!generateAckQueued.getAndSet(false)) throw new AssertionError();
try { try {
Ack a; Ack a;
Transaction txn = db.startTransaction(false); Transaction txn = db.startTransaction(false);
@@ -212,7 +241,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return; if (interrupted) return;
recordWriter.writeAck(ack); recordWriter.writeAck(ack);
LOG.info("Sent ack"); LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck()); generateAck();
} }
} }
@@ -222,6 +251,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() { public void run() {
if (interrupted) return; if (interrupted) return;
if (!generateBatchQueued.getAndSet(false))
throw new AssertionError();
try { try {
Collection<byte[]> b; Collection<byte[]> b;
Transaction txn = db.startTransaction(false); Transaction txn = db.startTransaction(false);
@@ -256,7 +287,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return; if (interrupted) return;
for (byte[] raw : batch) recordWriter.writeMessage(raw); for (byte[] raw : batch) recordWriter.writeMessage(raw);
LOG.info("Sent batch"); LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch()); generateBatch();
} }
} }
@@ -266,6 +297,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() { public void run() {
if (interrupted) return; if (interrupted) return;
if (!generateOfferQueued.getAndSet(false))
throw new AssertionError();
try { try {
Offer o; Offer o;
Transaction txn = db.startTransaction(false); Transaction txn = db.startTransaction(false);
@@ -300,7 +333,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return; if (interrupted) return;
recordWriter.writeOffer(offer); recordWriter.writeOffer(offer);
LOG.info("Sent offer"); LOG.info("Sent offer");
dbExecutor.execute(new GenerateOffer()); generateOffer();
} }
} }
@@ -310,6 +343,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() { public void run() {
if (interrupted) return; if (interrupted) return;
if (!generateRequestQueued.getAndSet(false))
throw new AssertionError();
try { try {
Request r; Request r;
Transaction txn = db.startTransaction(false); Transaction txn = db.startTransaction(false);
@@ -343,7 +378,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return; if (interrupted) return;
recordWriter.writeRequest(request); recordWriter.writeRequest(request);
LOG.info("Sent request"); LOG.info("Sent request");
dbExecutor.execute(new GenerateRequest()); generateRequest();
} }
} }
} }