Implement new message validation logic

that handles message dependencies reported from clients.

The MessageValidatedEvent has been renamed into a MessageDeliveredEvent
since there were no real use cases for the former any more.
This commit is contained in:
Torsten Grote
2016-05-23 18:40:40 -03:00
parent b03d0a206b
commit 5561532c5d
4 changed files with 927 additions and 25 deletions

View File

@@ -23,8 +23,10 @@ import org.briarproject.api.sync.ValidationManager;
import org.briarproject.api.sync.MessageContext;
import org.briarproject.util.ByteUtils;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -36,7 +38,9 @@ import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.api.sync.ValidationManager.State.DELIVERED;
import static org.briarproject.api.sync.ValidationManager.State.INVALID;
import static org.briarproject.api.sync.ValidationManager.State.PENDING;
import static org.briarproject.api.sync.ValidationManager.State.VALID;
class ValidationManagerImpl implements ValidationManager, Service,
@@ -66,7 +70,10 @@ class ValidationManagerImpl implements ValidationManager, Service,
@Override
public void startService() {
if (used.getAndSet(true)) throw new IllegalStateException();
for (ClientId c : validators.keySet()) getMessagesToValidate(c);
for (ClientId c : validators.keySet()) {
validateOutstandingMessages(c);
deliverOutstandingMessages(c);
}
}
@Override
@@ -84,7 +91,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
hooks.put(c, hook);
}
private void getMessagesToValidate(final ClientId c) {
private void validateOutstandingMessages(final ClientId c) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
@@ -128,6 +135,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
LOG.info("Group removed before validation");
// Continue to next message
} finally {
if (!txn.isComplete()) txn.setComplete();
db.endTransaction(txn);
}
if (m != null && g != null) validateMessage(m, g);
@@ -140,6 +148,112 @@ class ValidationManagerImpl implements ValidationManager, Service,
});
}
private void deliverOutstandingMessages(final ClientId c) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Queue<MessageId> validated = new LinkedList<MessageId>();
Queue<MessageId> pending = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(true);
try {
validated.addAll(db.getMessagesToDeliver(txn, c));
pending.addAll(db.getPendingMessages(txn, c));
txn.setComplete();
} finally {
db.endTransaction(txn);
}
deliverNextMessage(validated);
deliverNextPendingMessage(pending);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
private void deliverNextMessage(final Queue<MessageId> validated) {
if (validated.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Message m = null;
Group g = null;
Metadata meta = null;
Transaction txn = db.startTransaction(true);
try {
MessageId id = validated.poll();
byte[] raw = db.getRawMessage(txn, id);
m = parseMessage(id, raw);
g = db.getGroup(txn, m.getGroupId());
meta = db.getMessageMetadata(txn, id);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (g != null) deliverMessage(m, g.getClientId(), meta);
deliverNextMessage(validated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
private void deliverNextPendingMessage(final Queue<MessageId> pending) {
if (pending.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
Message m = null;
ClientId c = null;
try {
boolean allDelivered = true;
Metadata meta = null;
Transaction txn = db.startTransaction(true);
try {
MessageId id = pending.poll();
byte[] raw = db.getRawMessage(txn, id);
m = parseMessage(id, raw);
Group g = db.getGroup(txn, m.getGroupId());
c = g.getClientId();
// check if a dependency is invalid
Map<MessageId, State> states =
db.getMessageDependencies(txn, id);
for (Entry<MessageId, State> d : states.entrySet()) {
if (d.getValue() == INVALID) {
throw new InvalidMessageException(
"Invalid Dependency");
}
if (d.getValue() != DELIVERED) allDelivered = false;
}
if(allDelivered) {
meta = db.getMessageMetadata(txn, id);
}
txn.setComplete();
} finally {
if (!txn.isComplete()) txn.setComplete();
db.endTransaction(txn);
}
if (c != null && allDelivered) deliverMessage(m, c, meta);
deliverNextPendingMessage(pending);
} catch(InvalidMessageException e) {
if (LOG.isLoggable(INFO))
LOG.log(INFO, e.toString(), e);
markMessageInvalid(m, c);
deliverNextPendingMessage(pending);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
private Message parseMessage(MessageId id, byte[] raw) {
if (raw.length <= MESSAGE_HEADER_LENGTH)
throw new IllegalArgumentException();
@@ -176,19 +290,105 @@ class ValidationManagerImpl implements ValidationManager, Service,
@Override
public void run() {
try {
State newState = null;
Metadata meta = null;
Transaction txn = db.startTransaction(false);
try {
Metadata meta = result.getMetadata();
// store dependencies
Collection<MessageId> dependencies =
result.getDependencies();
if (dependencies != null && dependencies.size() > 0) {
db.addMessageDependencies(txn, m, dependencies);
}
// check if a dependency is invalid
// and if all dependencies have been delivered
Map<MessageId, State> states =
db.getMessageDependencies(txn, m.getId());
newState = VALID;
for (Entry<MessageId, State> d : states.entrySet()) {
if (d.getValue() == INVALID) {
throw new InvalidMessageException(
"Dependency Invalid");
}
if (d.getValue() != DELIVERED) {
newState = PENDING;
LOG.info("depend. undelivered, set to PENDING");
break;
}
}
// save metadata and new message state
meta = result.getMetadata();
db.mergeMessageMetadata(txn, m.getId(), meta);
db.setMessageState(txn, m.getId(), c, VALID);
db.setMessageShared(txn, m, true);
db.setMessageState(txn, m, c, newState);
txn.setComplete();
} finally {
if (!txn.isComplete()) txn.setComplete();
db.endTransaction(txn);
}
// deliver message if valid
if (newState == VALID) {
deliverMessage(m, c, meta);
}
} catch (InvalidMessageException e) {
if (LOG.isLoggable(INFO))
LOG.log(INFO, e.toString(), e);
markMessageInvalid(m, c);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
private void deliverMessage(final Message m, final ClientId c,
final Metadata meta) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Queue<MessageId> pending = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(false);
try {
IncomingMessageHook hook = hooks.get(c);
if (hook != null)
hook.incomingMessage(txn, m, meta);
// check if message was deleted by client
if (db.getRawMessage(txn, m.getId()) == null) {
throw new InvalidMessageException(
"Deleted by Client");
}
db.setMessageShared(txn, m, true);
db.setMessageState(txn, m, c, DELIVERED);
// deliver pending dependents
Map<MessageId, State> dependents =
db.getMessageDependents(txn, m.getId());
for (Entry<MessageId, State> i : dependents
.entrySet()) {
if (i.getValue() != PENDING) continue;
// check that all dependencies are delivered
Map<MessageId, State> dependencies =
db.getMessageDependencies(txn, i.getKey());
for (Entry<MessageId, State> j : dependencies
.entrySet()) {
if (j.getValue() != DELIVERED) return;
}
pending.add(i.getKey());
}
txn.setComplete();
} finally {
if (!txn.isComplete()) txn.setComplete();
db.endTransaction(txn);
}
deliverNextMessage(pending);
} catch (InvalidMessageException e) {
if (LOG.isLoggable(INFO))
LOG.log(INFO, e.toString(), e);
markMessageInvalid(m, c);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
@@ -202,13 +402,56 @@ class ValidationManagerImpl implements ValidationManager, Service,
@Override
public void run() {
try {
Queue<MessageId> invalid = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(false);
try {
db.setMessageState(txn, m.getId(), c, INVALID);
Map<MessageId, State> dependents =
db.getMessageDependents(txn, m.getId());
db.setMessageState(txn, m, c, INVALID);
db.deleteMessage(txn, m.getId());
db.deleteMessageMetadata(txn, m.getId());
// recursively invalidate all messages that depend on m
// TODO check that cycles are properly taken care of
for (Entry<MessageId, State> i : dependents
.entrySet()) {
if (i.getValue() != INVALID) {
invalid.add(i.getKey());
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
markNextMessageInvalid(invalid);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
private void markNextMessageInvalid(final Queue<MessageId> invalid) {
if (invalid.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Message m = null;
Group g = null;
Transaction txn = db.startTransaction(true);
try {
MessageId id = invalid.poll();
byte[] raw = db.getRawMessage(txn, id);
m = parseMessage(id, raw);
g = db.getGroup(txn, m.getGroupId());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (g != null) markMessageInvalid(m, g.getClientId());
markNextMessageInvalid(invalid);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);