When a message is shared, share its transitive dependencies

Like other recursive operations on the dependency graph, this is
not done in a single transaction to prevent an attacker from creating
arbitrary large transactions.

So at startup, the `ValidationManager` finds and resumes any
unfinished operations, by looking for shared messages with unshared
dependencies.
This commit is contained in:
Torsten Grote
2016-08-31 13:15:59 -03:00
parent 7a0db798d1
commit d058172429
19 changed files with 480 additions and 76 deletions

View File

@@ -72,6 +72,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
for (ClientId c : validators.keySet()) {
validateOutstandingMessagesAsync(c);
deliverOutstandingMessagesAsync(c);
shareOutstandingMessagesAsync(c);
}
}
@@ -191,6 +192,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
private void deliverNextPendingMessage(Queue<MessageId> pending) {
try {
boolean anyInvalid = false, allDelivered = true;
Queue<MessageId> toShare = null;
Queue<MessageId> invalidate = null;
Transaction txn = db.startTransaction(false);
try {
@@ -213,8 +215,14 @@ class ValidationManagerImpl implements ValidationManager, Service,
ClientId c = g.getClientId();
Metadata meta = db.getMessageMetadataForValidator(txn,
id);
if (deliverMessage(txn, m, c, meta)) {
DeliveryResult result = deliverMessage(txn, m, c, meta);
if (result.valid) {
pending.addAll(getPendingDependents(txn, id));
if (result.share) {
db.setMessageShared(txn, id);
toShare = new LinkedList<MessageId>(
states.keySet());
}
} else {
invalidate = getDependentsToInvalidate(txn, id);
}
@@ -226,6 +234,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
if (invalidate != null) invalidateNextMessageAsync(invalidate);
deliverNextPendingMessageAsync(pending);
if (toShare != null) shareNextMessageAsync(toShare);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before delivery");
deliverNextPendingMessageAsync(pending);
@@ -284,16 +293,17 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
private void storeMessageContext(Message m, ClientId c,
MessageContext result) {
MessageContext context) {
try {
MessageId id = m.getId();
boolean anyInvalid = false, allDelivered = true;
Queue<MessageId> invalidate = null;
Queue<MessageId> pending = null;
Queue<MessageId> toShare = null;
Transaction txn = db.startTransaction(false);
try {
// Check if message has any dependencies
Collection<MessageId> dependencies = result.getDependencies();
Collection<MessageId> dependencies = context.getDependencies();
if (!dependencies.isEmpty()) {
db.addMessageDependencies(txn, m, dependencies);
// Check if dependencies are valid and delivered
@@ -310,11 +320,17 @@ class ValidationManagerImpl implements ValidationManager, Service,
invalidate = getDependentsToInvalidate(txn, id);
}
} else {
Metadata meta = result.getMetadata();
Metadata meta = context.getMetadata();
db.mergeMessageMetadata(txn, id, meta);
if (allDelivered) {
if (deliverMessage(txn, m, c, meta)) {
DeliveryResult result = deliverMessage(txn, m, c, meta);
if (result.valid) {
pending = getPendingDependents(txn, id);
if (result.share) {
db.setMessageShared(txn, id);
toShare =
new LinkedList<MessageId>(dependencies);
}
} else {
invalidate = getDependentsToInvalidate(txn, id);
}
@@ -328,6 +344,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
if (invalidate != null) invalidateNextMessageAsync(invalidate);
if (pending != null) deliverNextPendingMessageAsync(pending);
if (toShare != null) shareNextMessageAsync(toShare);
} catch (NoSuchMessageException e) {
LOG.info("Message removed during validation");
} catch (NoSuchGroupException e) {
@@ -337,18 +354,21 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
}
private boolean deliverMessage(Transaction txn, Message m, ClientId c,
Metadata meta) throws DbException {
private DeliveryResult deliverMessage(Transaction txn, Message m,
ClientId c, Metadata meta) throws DbException {
// Deliver the message to the client if it's registered a hook
boolean shareMsg = false;
IncomingMessageHook hook = hooks.get(c);
if (hook != null) hook.incomingMessage(txn, m, meta);
if (hook != null) {
shareMsg = hook.incomingMessage(txn, m, meta);
}
// TODO: Find a better way for clients to signal validity, #643
if (db.getRawMessage(txn, m.getId()) == null) {
db.setMessageState(txn, m.getId(), INVALID);
return false;
return new DeliveryResult(false, false);
} else {
db.setMessageState(txn, m.getId(), DELIVERED);
return true;
return new DeliveryResult(true, shareMsg);
}
}
@@ -362,6 +382,70 @@ class ValidationManagerImpl implements ValidationManager, Service,
return pending;
}
private void shareOutstandingMessagesAsync(final ClientId c) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
shareOutstandingMessages(c);
}
});
}
private void shareOutstandingMessages(ClientId c) {
try {
Queue<MessageId> toShare = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(true);
try {
toShare.addAll(db.getMessagesToShare(txn, c));
txn.setComplete();
} finally {
db.endTransaction(txn);
}
shareNextMessageAsync(toShare);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
/**
* Shares the next message from the toShare queue asynchronously.
*
* This method should only be called for messages that have all their
* dependencies delivered and have been delivered themselves.
*/
private void shareNextMessageAsync(final Queue<MessageId> toShare) {
if (toShare.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
shareNextMessage(toShare);
}
});
}
private void shareNextMessage(Queue<MessageId> toShare) {
try {
Transaction txn = db.startTransaction(false);
try {
MessageId id = toShare.poll();
db.setMessageShared(txn, id);
toShare.addAll(db.getMessageDependencies(txn, id).keySet());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
shareNextMessageAsync(toShare);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before sharing");
shareNextMessageAsync(toShare);
} catch (NoSuchGroupException e) {
LOG.info("Group removed before sharing");
shareNextMessageAsync(toShare);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void invalidateNextMessageAsync(final Queue<MessageId> invalidate) {
if (invalidate.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@@ -447,4 +531,13 @@ class ValidationManagerImpl implements ValidationManager, Service,
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private static class DeliveryResult {
private final boolean valid, share;
private DeliveryResult(boolean valid, boolean share) {
this.valid = valid;
this.share = share;
}
}
}