Use a separate DB task for loading each message.

Each task is added to the back of the executor's queue, allowing tasks from other callers to be interleaved.
This commit is contained in:
akwizgran
2016-02-26 14:15:25 +00:00
parent 8c8b2a5358
commit 8ba95a2965

View File

@@ -9,6 +9,7 @@ import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.NoSuchGroupException;
import org.briarproject.api.db.NoSuchMessageException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
@@ -23,8 +24,10 @@ import org.briarproject.api.sync.MessageValidator;
import org.briarproject.api.sync.ValidationManager;
import org.briarproject.util.ByteUtils;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -81,19 +84,48 @@ class ValidationManagerImpl implements ValidationManager, Service,
dbExecutor.execute(new Runnable() {
public void run() {
try {
// TODO: Don't do all of this in a single DB task
Queue<MessageId> unvalidated = new LinkedList<MessageId>();
Transaction txn = db.startTransaction();
try {
for (MessageId id : db.getMessagesToValidate(txn, c)) {
byte[] raw = db.getRawMessage(txn, id);
Message m = parseMessage(id, raw);
Group g = db.getGroup(txn, m.getGroupId());
validateMessage(m, g);
}
unvalidated.addAll(db.getMessagesToValidate(txn, c));
txn.setComplete();
} finally {
db.endTransaction(txn);
}
validateNextMessage(unvalidated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
private void validateNextMessage(final Queue<MessageId> unvalidated) {
if (unvalidated.isEmpty()) return;
dbExecutor.execute(new Runnable() {
public void run() {
try {
Message m = null;
Group g = null;
Transaction txn = db.startTransaction();
try {
MessageId id = unvalidated.poll();
byte[] raw = db.getRawMessage(txn, id);
m = parseMessage(id, raw);
g = db.getGroup(txn, m.getGroupId());
txn.setComplete();
} catch (NoSuchMessageException e) {
LOG.info("Message removed before validation");
// Continue to next message
} catch (NoSuchGroupException e) {
LOG.info("Group removed before validation");
// Continue to next message
} finally {
db.endTransaction(txn);
}
if (m != null && g != null) validateMessage(m, g);
validateNextMessage(unvalidated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
@@ -158,21 +190,23 @@ class ValidationManagerImpl implements ValidationManager, Service,
if (e instanceof MessageAddedEvent) {
// Validate the message if it wasn't created locally
MessageAddedEvent m = (MessageAddedEvent) e;
if (m.getContactId() != null) loadGroup(m.getMessage());
if (m.getContactId() != null) loadGroupAndValidate(m.getMessage());
}
}
private void loadGroup(final Message m) {
private void loadGroupAndValidate(final Message m) {
dbExecutor.execute(new Runnable() {
public void run() {
try {
Group g;
Transaction txn = db.startTransaction();
try {
validateMessage(m, db.getGroup(txn, m.getGroupId()));
g = db.getGroup(txn, m.getGroupId());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
validateMessage(m, g);
} catch (NoSuchGroupException e) {
LOG.info("Group removed before validation");
} catch (DbException e) {