Avoid queueing redundant DB tasks during sync.

This commit is contained in:
akwizgran
2018-01-31 17:26:42 +00:00
parent 7ec05ac0cd
commit ea3ada5573

View File

@@ -29,6 +29,7 @@ import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
@@ -54,7 +55,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName());
private static final ThrowingRunnable<IOException> CLOSE = () -> {};
private static final ThrowingRunnable<IOException> CLOSE = () -> {
};
private final DatabaseComponent db;
private final Executor dbExecutor;
@@ -65,6 +67,12 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final RecordWriter recordWriter;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
private final AtomicBoolean generateAckQueued = new AtomicBoolean(false);
private final AtomicBoolean generateBatchQueued = new AtomicBoolean(false);
private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false);
private final AtomicBoolean generateRequestQueued =
new AtomicBoolean(false);
private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
@@ -87,10 +95,10 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
eventBus.addListener(this);
try {
// Start a query for each type of record
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
dbExecutor.execute(new GenerateRequest());
generateAck();
generateBatch();
generateOffer();
generateRequest();
long now = clock.currentTimeMillis();
long nextKeepalive = now + maxIdleTime;
long nextRetxQuery = now + RETX_QUERY_INTERVAL;
@@ -115,8 +123,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
now = clock.currentTimeMillis();
if (now >= nextRetxQuery) {
// Check for retransmittable records
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
generateBatch();
generateOffer();
nextRetxQuery = now + RETX_QUERY_INTERVAL;
}
if (now >= nextKeepalive) {
@@ -142,6 +150,26 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
}
}
private void generateAck() {
if (generateAckQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateAck());
}
private void generateBatch() {
if (generateBatchQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateBatch());
}
private void generateOffer() {
if (generateOfferQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateOffer());
}
private void generateRequest() {
if (generateRequestQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateRequest());
}
@Override
public void interrupt() {
interrupted = true;
@@ -154,20 +182,20 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
ContactRemovedEvent c = (ContactRemovedEvent) e;
if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof MessageSharedEvent) {
dbExecutor.execute(new GenerateOffer());
generateOffer();
} else if (e instanceof GroupVisibilityUpdatedEvent) {
GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e;
if (g.getAffectedContacts().contains(contactId))
dbExecutor.execute(new GenerateOffer());
generateOffer();
} else if (e instanceof MessageRequestedEvent) {
if (((MessageRequestedEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateBatch());
generateBatch();
} else if (e instanceof MessageToAckEvent) {
if (((MessageToAckEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateAck());
generateAck();
} else if (e instanceof MessageToRequestEvent) {
if (((MessageToRequestEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateRequest());
generateRequest();
} else if (e instanceof ShutdownEvent) {
interrupt();
}
@@ -179,6 +207,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override
public void run() {
if (interrupted) return;
if (!generateAckQueued.getAndSet(false)) throw new AssertionError();
try {
Ack a;
Transaction txn = db.startTransaction(false);
@@ -212,7 +241,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return;
recordWriter.writeAck(ack);
LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck());
generateAck();
}
}
@@ -222,6 +251,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override
public void run() {
if (interrupted) return;
if (!generateBatchQueued.getAndSet(false))
throw new AssertionError();
try {
Collection<byte[]> b;
Transaction txn = db.startTransaction(false);
@@ -256,7 +287,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return;
for (byte[] raw : batch) recordWriter.writeMessage(raw);
LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch());
generateBatch();
}
}
@@ -266,6 +297,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override
public void run() {
if (interrupted) return;
if (!generateOfferQueued.getAndSet(false))
throw new AssertionError();
try {
Offer o;
Transaction txn = db.startTransaction(false);
@@ -300,7 +333,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return;
recordWriter.writeOffer(offer);
LOG.info("Sent offer");
dbExecutor.execute(new GenerateOffer());
generateOffer();
}
}
@@ -310,6 +343,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override
public void run() {
if (interrupted) return;
if (!generateRequestQueued.getAndSet(false))
throw new AssertionError();
try {
Request r;
Transaction txn = db.startTransaction(false);
@@ -343,7 +378,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return;
recordWriter.writeRequest(request);
LOG.info("Sent request");
dbExecutor.execute(new GenerateRequest());
generateRequest();
}
}
}