Refactor ValidationManager and fix some bugs. #619

This commit is contained in:
akwizgran
2016-09-08 14:57:41 +01:00
parent fd4275733f
commit 8a3e5bfb50
34 changed files with 978 additions and 922 deletions

View File

@@ -41,7 +41,6 @@ import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.api.sync.ValidationManager.State.DELIVERED;
import static org.briarproject.api.sync.ValidationManager.State.INVALID;
import static org.briarproject.api.sync.ValidationManager.State.PENDING;
import static org.briarproject.api.sync.ValidationManager.State.VALID;
class ValidationManagerImpl implements ValidationManager, Service,
EventListener {
@@ -71,8 +70,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
public void startService() {
if (used.getAndSet(true)) throw new IllegalStateException();
for (ClientId c : validators.keySet()) {
validateOutstandingMessages(c);
deliverOutstandingMessages(c);
validateOutstandingMessagesAsync(c);
deliverOutstandingMessagesAsync(c);
}
}
@@ -91,168 +90,156 @@ class ValidationManagerImpl implements ValidationManager, Service,
hooks.put(c, hook);
}
private void validateOutstandingMessages(final ClientId c) {
private void validateOutstandingMessagesAsync(final ClientId c) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Queue<MessageId> unvalidated = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(true);
try {
unvalidated.addAll(db.getMessagesToValidate(txn, c));
txn.setComplete();
} finally {
db.endTransaction(txn);
}
validateNextMessage(unvalidated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
validateOutstandingMessages(c);
}
});
}
private void validateNextMessage(final Queue<MessageId> unvalidated) {
private void validateOutstandingMessages(ClientId c) {
try {
Queue<MessageId> unvalidated = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(true);
try {
unvalidated.addAll(db.getMessagesToValidate(txn, c));
txn.setComplete();
} finally {
db.endTransaction(txn);
}
validateNextMessageAsync(unvalidated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void validateNextMessageAsync(final Queue<MessageId> unvalidated) {
if (unvalidated.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Message m = null;
Group g = null;
Transaction txn = db.startTransaction(true);
try {
MessageId id = unvalidated.poll();
byte[] raw = db.getRawMessage(txn, id);
m = parseMessage(id, raw);
g = db.getGroup(txn, m.getGroupId());
txn.setComplete();
} catch (NoSuchMessageException e) {
LOG.info("Message removed before validation");
// Continue to next message
} catch (NoSuchGroupException e) {
LOG.info("Group removed before validation");
// Continue to next message
} finally {
if (!txn.isComplete()) txn.setComplete();
db.endTransaction(txn);
}
if (m != null && g != null) validateMessage(m, g);
validateNextMessage(unvalidated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
validateNextMessage(unvalidated);
}
});
}
private void deliverOutstandingMessages(final ClientId c) {
private void validateNextMessage(Queue<MessageId> unvalidated) {
try {
Message m;
Group g;
Transaction txn = db.startTransaction(true);
try {
MessageId id = unvalidated.poll();
byte[] raw = db.getRawMessage(txn, id);
m = parseMessage(id, raw);
g = db.getGroup(txn, m.getGroupId());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
validateMessageAsync(m, g);
validateNextMessageAsync(unvalidated);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before validation");
validateNextMessageAsync(unvalidated);
} catch (NoSuchGroupException e) {
LOG.info("Group removed before validation");
validateNextMessageAsync(unvalidated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void deliverOutstandingMessagesAsync(final ClientId c) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Queue<MessageId> validated = new LinkedList<MessageId>();
Queue<MessageId> pending = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(true);
try {
validated.addAll(db.getMessagesToDeliver(txn, c));
pending.addAll(db.getPendingMessages(txn, c));
txn.setComplete();
} finally {
db.endTransaction(txn);
}
deliverNextMessage(validated);
deliverNextPendingMessage(pending);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
deliverOutstandingMessages(c);
}
});
}
private void deliverNextMessage(final Queue<MessageId> validated) {
if (validated.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Message m = null;
Group g = null;
Metadata meta = null;
Transaction txn = db.startTransaction(true);
try {
MessageId id = validated.poll();
byte[] raw = db.getRawMessage(txn, id);
m = parseMessage(id, raw);
g = db.getGroup(txn, m.getGroupId());
meta = db.getMessageMetadataForValidator(txn, id);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (g != null) deliverMessage(m, g.getClientId(), meta);
deliverNextMessage(validated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
private void deliverOutstandingMessages(ClientId c) {
try {
Queue<MessageId> pending = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(true);
try {
pending.addAll(db.getPendingMessages(txn, c));
txn.setComplete();
} finally {
db.endTransaction(txn);
}
});
deliverNextPendingMessageAsync(pending);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void deliverNextPendingMessage(final Queue<MessageId> pending) {
private void deliverNextPendingMessageAsync(
final Queue<MessageId> pending) {
if (pending.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
Message m = null;
ClientId c = null;
try {
boolean allDelivered = true;
Metadata meta = null;
Transaction txn = db.startTransaction(true);
try {
MessageId id = pending.poll();
byte[] raw = db.getRawMessage(txn, id);
m = parseMessage(id, raw);
Group g = db.getGroup(txn, m.getGroupId());
c = g.getClientId();
// check if a dependency is invalid
Map<MessageId, State> states =
db.getMessageDependencies(txn, id);
for (Entry<MessageId, State> d : states.entrySet()) {
if (d.getValue() == INVALID) {
throw new InvalidMessageException(
"Invalid Dependency");
}
if (d.getValue() != DELIVERED) allDelivered = false;
}
if (allDelivered)
meta = db.getMessageMetadataForValidator(txn, id);
txn.setComplete();
} finally {
if (!txn.isComplete()) txn.setComplete();
db.endTransaction(txn);
}
if (c != null && allDelivered) deliverMessage(m, c, meta);
deliverNextPendingMessage(pending);
} catch(InvalidMessageException e) {
if (LOG.isLoggable(INFO))
LOG.log(INFO, e.toString(), e);
markMessageInvalid(m, c);
deliverNextPendingMessage(pending);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
deliverNextPendingMessage(pending);
}
});
}
private void deliverNextPendingMessage(Queue<MessageId> pending) {
try {
boolean anyInvalid = false, allDelivered = true;
Queue<MessageId> invalidate = null;
Transaction txn = db.startTransaction(false);
try {
MessageId id = pending.poll();
// Check if message is still pending
if (db.getMessageState(txn, id) == PENDING) {
// Check if dependencies are valid and delivered
Map<MessageId, State> states =
db.getMessageDependencies(txn, id);
for (Entry<MessageId, State> e : states.entrySet()) {
if (e.getValue() == INVALID) anyInvalid = true;
if (e.getValue() != DELIVERED) allDelivered = false;
}
if (anyInvalid) {
if (db.getMessageState(txn, id) != INVALID) {
invalidateMessage(txn, id);
invalidate = getDependentsToInvalidate(txn, id);
}
} else if (allDelivered) {
Message m = parseMessage(id, db.getRawMessage(txn, id));
Group g = db.getGroup(txn, m.getGroupId());
ClientId c = g.getClientId();
Metadata meta = db.getMessageMetadataForValidator(txn,
id);
if (deliverMessage(txn, m, c, meta)) {
pending.addAll(getPendingDependents(txn, id));
} else {
invalidate = getDependentsToInvalidate(txn, id);
}
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (invalidate != null) invalidateNextMessageAsync(invalidate);
deliverNextPendingMessageAsync(pending);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before delivery");
deliverNextPendingMessageAsync(pending);
} catch (NoSuchGroupException e) {
LOG.info("Group removed before delivery");
deliverNextPendingMessageAsync(pending);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
private Message parseMessage(MessageId id, byte[] raw) {
if (raw.length <= MESSAGE_HEADER_LENGTH)
throw new IllegalArgumentException();
@@ -262,199 +249,168 @@ class ValidationManagerImpl implements ValidationManager, Service,
return new Message(id, new GroupId(groupId), timestamp, raw);
}
private void validateMessage(final Message m, final Group g) {
private void validateMessageAsync(final Message m, final Group g) {
cryptoExecutor.execute(new Runnable() {
@Override
public void run() {
MessageValidator v = validators.get(g.getClientId());
if (v == null) {
LOG.warning("No validator");
} else {
try {
MessageContext context = v.validateMessage(m, g);
storeMessageContext(m, g.getClientId(), context);
} catch (InvalidMessageException e) {
if (LOG.isLoggable(INFO))
LOG.log(INFO, e.toString(), e);
markMessageInvalid(m, g.getClientId());
}
}
validateMessage(m, g);
}
});
}
private void storeMessageContext(final Message m, final ClientId c,
private void validateMessage(Message m, Group g) {
MessageValidator v = validators.get(g.getClientId());
if (v == null) {
LOG.warning("No validator");
} else {
try {
MessageContext context = v.validateMessage(m, g);
storeMessageContextAsync(m, g.getClientId(), context);
} catch (InvalidMessageException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
Queue<MessageId> invalidate = new LinkedList<MessageId>();
invalidate.add(m.getId());
invalidateNextMessageAsync(invalidate);
}
}
}
private void storeMessageContextAsync(final Message m, final ClientId c,
final MessageContext result) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
State newState = null;
Metadata meta = null;
Transaction txn = db.startTransaction(false);
try {
// store dependencies
Collection<MessageId> dependencies =
result.getDependencies();
if (dependencies != null && dependencies.size() > 0) {
db.addMessageDependencies(txn, m, dependencies);
}
// check if a dependency is invalid
// and if all dependencies have been delivered
Map<MessageId, State> states =
db.getMessageDependencies(txn, m.getId());
newState = VALID;
for (Entry<MessageId, State> d : states.entrySet()) {
if (d.getValue() == INVALID) {
throw new InvalidMessageException(
"Dependency Invalid");
}
if (d.getValue() != DELIVERED) {
newState = PENDING;
LOG.info("depend. undelivered, set to PENDING");
break;
}
}
// save metadata and new message state
meta = result.getMetadata();
db.mergeMessageMetadata(txn, m.getId(), meta);
db.setMessageState(txn, m, c, newState);
txn.setComplete();
} finally {
if (!txn.isComplete()) txn.setComplete();
db.endTransaction(txn);
}
// deliver message if valid
if (newState == VALID) {
deliverMessage(m, c, meta);
}
} catch (InvalidMessageException e) {
if (LOG.isLoggable(INFO))
LOG.log(INFO, e.toString(), e);
markMessageInvalid(m, c);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
storeMessageContext(m, c, result);
}
});
}
private void deliverMessage(final Message m, final ClientId c,
final Metadata meta) {
private void storeMessageContext(Message m, ClientId c,
MessageContext result) {
try {
MessageId id = m.getId();
boolean anyInvalid = false, allDelivered = true;
Queue<MessageId> invalidate = null;
Queue<MessageId> pending = null;
Transaction txn = db.startTransaction(false);
try {
// Check if message has any dependencies
Collection<MessageId> dependencies = result.getDependencies();
if (!dependencies.isEmpty()) {
db.addMessageDependencies(txn, m, dependencies);
// Check if dependencies are valid and delivered
Map<MessageId, State> states =
db.getMessageDependencies(txn, id);
for (Entry<MessageId, State> e : states.entrySet()) {
if (e.getValue() == INVALID) anyInvalid = true;
if (e.getValue() != DELIVERED) allDelivered = false;
}
}
if (anyInvalid) {
if (db.getMessageState(txn, id) != INVALID) {
invalidateMessage(txn, id);
invalidate = getDependentsToInvalidate(txn, id);
}
} else {
Metadata meta = result.getMetadata();
db.mergeMessageMetadata(txn, id, meta);
if (allDelivered) {
if (deliverMessage(txn, m, c, meta)) {
pending = getPendingDependents(txn, id);
} else {
invalidate = getDependentsToInvalidate(txn, id);
}
} else {
db.setMessageState(txn, id, PENDING);
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (invalidate != null) invalidateNextMessageAsync(invalidate);
if (pending != null) deliverNextPendingMessageAsync(pending);
} catch (NoSuchMessageException e) {
LOG.info("Message removed during validation");
} catch (NoSuchGroupException e) {
LOG.info("Group removed during validation");
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private boolean deliverMessage(Transaction txn, Message m, ClientId c,
Metadata meta) throws DbException {
IncomingMessageHook hook = hooks.get(c);
if (hook == null) throw new DbException();
hook.incomingMessage(txn, m, meta);
// TODO: Find a better way for clients to signal validity, #643
if (db.getRawMessage(txn, m.getId()) == null) {
db.setMessageState(txn, m.getId(), INVALID);
return false;
} else {
db.setMessageState(txn, m.getId(), DELIVERED);
return true;
}
}
private Queue<MessageId> getPendingDependents(Transaction txn, MessageId m)
throws DbException {
Queue<MessageId> pending = new LinkedList<MessageId>();
Map<MessageId, State> states = db.getMessageDependents(txn, m);
for (Entry<MessageId, State> e : states.entrySet()) {
if (e.getValue() == PENDING) pending.add(e.getKey());
}
return pending;
}
private void invalidateNextMessageAsync(final Queue<MessageId> invalidate) {
if (invalidate.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Queue<MessageId> pending = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(false);
try {
IncomingMessageHook hook = hooks.get(c);
if (hook != null)
hook.incomingMessage(txn, m, meta);
// check if message was deleted by client
if (db.getRawMessage(txn, m.getId()) == null) {
throw new InvalidMessageException(
"Deleted by Client");
}
db.setMessageState(txn, m, c, DELIVERED);
// deliver pending dependents
Map<MessageId, State> dependents =
db.getMessageDependents(txn, m.getId());
for (Entry<MessageId, State> i : dependents
.entrySet()) {
if (i.getValue() != PENDING) continue;
// check that all dependencies are delivered
Map<MessageId, State> dependencies =
db.getMessageDependencies(txn, i.getKey());
for (Entry<MessageId, State> j : dependencies
.entrySet()) {
if (j.getValue() != DELIVERED) return;
}
pending.add(i.getKey());
}
txn.setComplete();
} finally {
if (!txn.isComplete()) txn.setComplete();
db.endTransaction(txn);
}
deliverNextMessage(pending);
} catch (InvalidMessageException e) {
if (LOG.isLoggable(INFO))
LOG.log(INFO, e.toString(), e);
markMessageInvalid(m, c);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
invalidateNextMessage(invalidate);
}
});
}
private void markMessageInvalid(final Message m, final ClientId c) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Queue<MessageId> invalid = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(false);
try {
Map<MessageId, State> dependents =
db.getMessageDependents(txn, m.getId());
db.setMessageState(txn, m, c, INVALID);
db.deleteMessage(txn, m.getId());
db.deleteMessageMetadata(txn, m.getId());
// recursively invalidate all messages that depend on m
// TODO check that cycles are properly taken care of
for (Entry<MessageId, State> i : dependents
.entrySet()) {
if (i.getValue() != INVALID) {
invalid.add(i.getKey());
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
markNextMessageInvalid(invalid);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
private void invalidateNextMessage(Queue<MessageId> invalidate) {
try {
Transaction txn = db.startTransaction(false);
try {
MessageId id = invalidate.poll();
if (db.getMessageState(txn, id) != INVALID) {
invalidateMessage(txn, id);
invalidate.addAll(getDependentsToInvalidate(txn, id));
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
});
invalidateNextMessageAsync(invalidate);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before invalidation");
invalidateNextMessageAsync(invalidate);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void markNextMessageInvalid(final Queue<MessageId> invalid) {
if (invalid.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Message m = null;
Group g = null;
Transaction txn = db.startTransaction(true);
try {
MessageId id = invalid.poll();
byte[] raw = db.getRawMessage(txn, id);
m = parseMessage(id, raw);
g = db.getGroup(txn, m.getGroupId());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (g != null) markMessageInvalid(m, g.getClientId());
markNextMessageInvalid(invalid);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
private void invalidateMessage(Transaction txn, MessageId m)
throws DbException {
db.setMessageState(txn, m, INVALID);
db.deleteMessage(txn, m);
db.deleteMessageMetadata(txn, m);
}
private Queue<MessageId> getDependentsToInvalidate(Transaction txn,
MessageId m) throws DbException {
Queue<MessageId> invalidate = new LinkedList<MessageId>();
Map<MessageId, State> states = db.getMessageDependents(txn, m);
for (Entry<MessageId, State> e : states.entrySet()) {
if (e.getValue() != INVALID) invalidate.add(e.getKey());
}
return invalidate;
}
@Override
@@ -462,31 +418,35 @@ class ValidationManagerImpl implements ValidationManager, Service,
if (e instanceof MessageAddedEvent) {
// Validate the message if it wasn't created locally
MessageAddedEvent m = (MessageAddedEvent) e;
if (m.getContactId() != null) loadGroupAndValidate(m.getMessage());
if (m.getContactId() != null)
loadGroupAndValidateAsync(m.getMessage());
}
}
private void loadGroupAndValidate(final Message m) {
private void loadGroupAndValidateAsync(final Message m) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Group g;
Transaction txn = db.startTransaction(true);
try {
g = db.getGroup(txn, m.getGroupId());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
validateMessage(m, g);
} catch (NoSuchGroupException e) {
LOG.info("Group removed before validation");
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
loadGroupAndValidate(m);
}
});
}
private void loadGroupAndValidate(final Message m) {
try {
Group g;
Transaction txn = db.startTransaction(true);
try {
g = db.getGroup(txn, m.getGroupId());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
validateMessageAsync(m, g);
} catch (NoSuchGroupException e) {
LOG.info("Group removed before validation");
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}