Merge branch '643-allow-messages-to-be-deleted-in-the-delivery-hook' into 'master'

Allow messages to be deleted in delivery hook

Closes #643

See merge request !385
This commit is contained in:
akwizgran
2016-11-04 16:14:44 +00:00
11 changed files with 134 additions and 155 deletions

View File

@@ -1,5 +1,6 @@
package org.briarproject.api.clients;
import org.briarproject.api.FormatException;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.Transaction;
@@ -48,8 +49,20 @@ public interface MessageQueueManager {
/**
* Called once for each incoming message that passes validation.
* Messages are passed to the hook in order.
*
* @throws DbException Should only be used for real database errors.
* If this is thrown, delivery will be attempted again at next startup,
* whereas if an FormatException is thrown,
* the message will be permanently invalidated.
* @throws FormatException for any non-database error
* that occurs while handling remotely created data.
* This includes errors that occur while handling locally created data
* in a context controlled by remotely created data
* (for example, parsing the metadata of a dependency
* of an incoming message).
* Never rethrow DbException as FormatException!
*/
void incomingMessage(Transaction txn, QueueMessage q, Metadata meta)
throws DbException;
throws DbException, FormatException;
}
}

View File

@@ -55,9 +55,23 @@ public interface ValidationManager {
/**
* Called once for each incoming message that passes validation.
*
* @return whether or not this message should be shared
* @throws DbException Should only be used for real database errors.
* If this is thrown, delivery will be attempted again at next startup,
* whereas if an InvalidMessageException is thrown,
* the message will be permanently invalidated.
* @throws InvalidMessageException for any non-database error
* that occurs while handling remotely created data.
* This includes errors that occur while handling locally created data
* in a context controlled by remotely created data
* (for example, parsing the metadata of a dependency
* of an incoming message).
* Throwing this will delete the incoming message and its metadata
* marking it as invalid in the database.
* Never rethrow DbException as InvalidMessageException!
*/
boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException;
throws DbException, InvalidMessageException;
}
}

View File

@@ -48,7 +48,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -75,9 +74,6 @@ import static org.briarproject.blogs.BlogPostValidator.authorToBdfDictionary;
class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager,
AddContactHook, RemoveContactHook, Client {
private static final Logger LOG =
Logger.getLogger(BlogManagerImpl.class.getName());
static final ClientId CLIENT_ID = new ClientId(StringUtils.fromHexString(
"dafbe56f0c8971365cea4bb5f08ec9a6" +
"1d686e058b943997b6ff259ba423f613"));

View File

@@ -14,6 +14,7 @@ import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.InvalidMessageException;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.ValidationManager.IncomingMessageHook;
@@ -39,33 +40,48 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
this.metadataParser = metadataParser;
}
/**
* Called once for each incoming message that passes validation.
*
* @throws DbException Should only be used for real database errors.
* Do not rethrow
* @throws FormatException Use this for any non-database error
* that occurs while handling remotely created data.
* This includes errors that occur while handling locally created data
* in a context controlled by remotely created data
* (for example, parsing the metadata of a dependency
* of an incoming message).
* Throwing this will delete the incoming message and its metadata
* marking it as invalid in the database.
* Never rethrow DbException as FormatException
*/
protected abstract boolean incomingMessage(Transaction txn, Message m,
BdfList body, BdfDictionary meta) throws DbException,
FormatException;
@Override
public boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException {
return incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH);
throws DbException, InvalidMessageException {
try {
return incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH);
} catch (FormatException e) {
throw new InvalidMessageException(e);
}
}
@Override
public void incomingMessage(Transaction txn, QueueMessage q, Metadata meta)
throws DbException {
throws DbException, FormatException {
incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH);
}
private boolean incomingMessage(Transaction txn, Message m, Metadata meta,
int headerLength) throws DbException {
try {
byte[] raw = m.getRaw();
BdfList body = clientHelper.toList(raw, headerLength,
raw.length - headerLength);
BdfDictionary metaDictionary = metadataParser.parse(meta);
return incomingMessage(txn, m, body, metaDictionary);
} catch (FormatException e) {
throw new DbException(e);
}
int headerLength) throws DbException, FormatException {
byte[] raw = m.getRaw();
BdfList body = clientHelper.toList(raw, headerLength,
raw.length - headerLength);
BdfDictionary metaDictionary = metadataParser.parse(meta);
return incomingMessage(txn, m, body, metaDictionary);
}
protected void trackIncomingMessage(Transaction txn, Message m)

View File

@@ -132,7 +132,7 @@ class MessageQueueManagerImpl implements MessageQueueManager {
private long outgoingPosition, incomingPosition;
private final TreeMap<Long, MessageId> pending;
QueueState(long outgoingPosition, long incomingPosition,
private QueueState(long outgoingPosition, long incomingPosition,
TreeMap<Long, MessageId> pending) {
this.outgoingPosition = outgoingPosition;
this.incomingPosition = incomingPosition;
@@ -166,7 +166,7 @@ class MessageQueueManagerImpl implements MessageQueueManager {
private final QueueMessageValidator delegate;
DelegatingMessageValidator(QueueMessageValidator delegate) {
private DelegatingMessageValidator(QueueMessageValidator delegate) {
this.delegate = delegate;
}
@@ -193,8 +193,8 @@ class MessageQueueManagerImpl implements MessageQueueManager {
}
@Override
public boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException {
public boolean incomingMessage(Transaction txn, Message m,
Metadata meta) throws DbException, InvalidMessageException {
long queuePosition = ByteUtils.readUint64(m.getRaw(),
MESSAGE_HEADER_LENGTH);
QueueState queueState = loadQueueState(txn, m.getGroupId());
@@ -227,16 +227,20 @@ class MessageQueueManagerImpl implements MessageQueueManager {
// Save the queue state before passing control to the delegate
saveQueueState(txn, m.getGroupId(), queueState);
// Deliver the messages to the delegate
delegate.incomingMessage(txn, q, meta);
for (MessageId id : consecutive) {
byte[] raw = db.getRawMessage(txn, id);
meta = db.getMessageMetadata(txn, id);
q = queueMessageFactory.createMessage(id, raw);
if (LOG.isLoggable(INFO)) {
LOG.info("Delivering pending message with position "
+ q.getQueuePosition());
}
try {
delegate.incomingMessage(txn, q, meta);
for (MessageId id : consecutive) {
byte[] raw = db.getRawMessage(txn, id);
meta = db.getMessageMetadata(txn, id);
q = queueMessageFactory.createMessage(id, raw);
if (LOG.isLoggable(INFO)) {
LOG.info("Delivering pending message with position "
+ q.getQueuePosition());
}
delegate.incomingMessage(txn, q, meta);
}
} catch (FormatException e) {
throw new InvalidMessageException(e);
}
}
// message queues are only useful for groups with two members

View File

@@ -148,14 +148,12 @@ class ForumManagerImpl extends BdfIncomingMessageHook implements ForumManager {
BdfDictionary meta = new BdfDictionary();
meta.put(KEY_TIMESTAMP, p.getMessage().getTimestamp());
if (p.getParent() != null) meta.put(KEY_PARENT, p.getParent());
if (p.getAuthor() != null) {
Author a = p.getAuthor();
BdfDictionary authorMeta = new BdfDictionary();
authorMeta.put(KEY_ID, a.getId());
authorMeta.put(KEY_NAME, a.getName());
authorMeta.put(KEY_PUBLIC_NAME, a.getPublicKey());
meta.put(KEY_AUTHOR, authorMeta);
}
Author a = p.getAuthor();
BdfDictionary authorMeta = new BdfDictionary();
authorMeta.put(KEY_ID, a.getId());
authorMeta.put(KEY_NAME, a.getName());
authorMeta.put(KEY_PUBLIC_NAME, a.getPublicKey());
meta.put(KEY_AUTHOR, authorMeta);
meta.put(KEY_LOCAL, true);
meta.put(MSG_KEY_READ, true);
clientHelper.addLocalMessage(txn, p.getMessage(), meta, true);

View File

@@ -32,7 +32,6 @@ import org.briarproject.api.sync.MessageStatus;
import org.briarproject.clients.ConversationClientImpl;
import org.briarproject.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -206,7 +205,7 @@ class IntroductionManagerImpl extends ConversationClientImpl
*/
@Override
protected boolean incomingMessage(Transaction txn, Message m, BdfList body,
BdfDictionary message) throws DbException {
BdfDictionary message) throws DbException, FormatException {
// Get message data and type
GroupId groupId = m.getGroupId();
@@ -220,19 +219,9 @@ class IntroductionManagerImpl extends ConversationClientImpl
} catch (FormatException e) {
stateExists = false;
}
BdfDictionary state;
try {
if (stateExists) throw new FormatException();
state = introduceeManager.initialize(txn, groupId, message);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) {
LOG.warning(
"Could not initialize introducee state, deleting...");
LOG.log(WARNING, e.toString(), e);
}
deleteMessage(txn, m.getId());
return false;
}
if (stateExists) throw new FormatException();
BdfDictionary state =
introduceeManager.initialize(txn, groupId, message);
try {
introduceeManager.incomingMessage(txn, state, message);
trackIncomingMessage(txn, m);
@@ -240,21 +229,15 @@ class IntroductionManagerImpl extends ConversationClientImpl
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
introduceeManager.abort(txn, state);
} catch (FormatException e) {
// FIXME necessary?
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
introduceeManager.abort(txn, state);
}
}
// our role can be anything
else if (type == TYPE_RESPONSE || type == TYPE_ACK || type == TYPE_ABORT) {
BdfDictionary state;
try {
state = getSessionState(txn, groupId,
message.getRaw(SESSION_ID));
} catch (FormatException e) {
LOG.warning("Could not find state for message, deleting...");
deleteMessage(txn, m.getId());
return false;
}
BdfDictionary state =
getSessionState(txn, groupId, message.getRaw(SESSION_ID));
long role = state.getLong(ROLE, -1L);
try {
@@ -263,18 +246,17 @@ class IntroductionManagerImpl extends ConversationClientImpl
} else if (role == ROLE_INTRODUCEE) {
introduceeManager.incomingMessage(txn, state, message);
} else {
if(LOG.isLoggable(WARNING)) {
LOG.warning("Unknown role '" + role +
"'. Deleting message...");
deleteMessage(txn, m.getId());
}
if (LOG.isLoggable(WARNING))
LOG.warning("Unknown role '" + role + "'");
throw new DbException();
}
if (type == TYPE_RESPONSE) trackIncomingMessage(txn, m);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
if (role == ROLE_INTRODUCER) introducerManager.abort(txn, state);
else introduceeManager.abort(txn, state);
} catch (IOException e) {
} catch (FormatException e) {
// FIXME necessary?
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
if (role == ROLE_INTRODUCER) introducerManager.abort(txn, state);
else introduceeManager.abort(txn, state);

View File

@@ -41,7 +41,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -65,8 +64,6 @@ import static org.briarproject.privategroup.Constants.KEY_TYPE;
public class PrivateGroupManagerImpl extends BdfIncomingMessageHook implements
PrivateGroupManager {
private static final Logger LOG =
Logger.getLogger(PrivateGroupManagerImpl.class.getName());
static final ClientId CLIENT_ID = new ClientId(
StringUtils.fromHexString("5072697661746547726f75704d616e61"
+ "67657220627920546f727374656e2047"));
@@ -198,7 +195,8 @@ public class PrivateGroupManagerImpl extends BdfIncomingMessageHook implements
try {
BdfDictionary meta = new BdfDictionary();
meta.put(KEY_TYPE, POST.getInt());
if (m.getParent() != null) meta.put(KEY_PARENT_MSG_ID, m.getParent());
if (m.getParent() != null)
meta.put(KEY_PARENT_MSG_ID, m.getParent());
addMessageMetadata(meta, m, true);
clientHelper.addLocalMessage(txn, m.getMessage(), meta, true);
setPreviousMsgId(txn, m.getMessage().getGroupId(),
@@ -429,44 +427,29 @@ public class PrivateGroupManagerImpl extends BdfIncomingMessageHook implements
MessageId parentId = new MessageId(parentIdBytes);
BdfDictionary parentMeta = clientHelper
.getMessageMetadataAsDictionary(txn, parentId);
if (timestamp <= parentMeta.getLong(KEY_TIMESTAMP)) {
// FIXME throw new InvalidMessageException() (#643)
db.deleteMessage(txn, m.getId());
return false;
}
if (timestamp <= parentMeta.getLong(KEY_TIMESTAMP))
throw new FormatException();
MessageType parentType = MessageType
.valueOf(parentMeta.getLong(KEY_TYPE).intValue());
if (parentType != POST) {
// FIXME throw new InvalidMessageException() (#643)
db.deleteMessage(txn, m.getId());
return false;
}
if (parentType != POST)
throw new FormatException();
}
// and the member's previous message
byte[] previousMsgIdBytes = meta.getRaw(KEY_PREVIOUS_MSG_ID);
MessageId previousMsgId = new MessageId(previousMsgIdBytes);
BdfDictionary previousMeta = clientHelper
.getMessageMetadataAsDictionary(txn, previousMsgId);
if (timestamp <= previousMeta.getLong(KEY_TIMESTAMP)) {
// FIXME throw new InvalidMessageException() (#643)
db.deleteMessage(txn, m.getId());
return false;
}
if (timestamp <= previousMeta.getLong(KEY_TIMESTAMP))
throw new FormatException();
// previous message must be from same member
if (!Arrays.equals(meta.getRaw(KEY_MEMBER_ID),
previousMeta.getRaw(KEY_MEMBER_ID))) {
// FIXME throw new InvalidMessageException() (#643)
db.deleteMessage(txn, m.getId());
return false;
}
previousMeta.getRaw(KEY_MEMBER_ID)))
throw new FormatException();
// previous message must be a POST or JOIN
MessageType previousType = MessageType
.valueOf(previousMeta.getLong(KEY_TYPE).intValue());
if (previousType != JOIN && previousType != POST) {
// FIXME throw new InvalidMessageException() (#643)
db.deleteMessage(txn, m.getId());
return false;
}
if (previousType != JOIN && previousType != POST)
throw new FormatException();
trackIncomingMessage(txn, m);
return true;
default:

View File

@@ -205,30 +205,25 @@ abstract class SharingManagerImpl<S extends Shareable, I extends Invitation, IS
// this is what we would expect under normal circumstances
stateExists = false;
}
try {
// check if we already have a state with that sessionId
if (stateExists) throw new FormatException();
// check if we already have a state with that sessionId
if (stateExists) throw new FormatException();
// check if shareable can be shared
I invitation = (I) msg;
S f = getSFactory().parse(invitation);
ContactId contactId = getContactId(txn, m.getGroupId());
Contact contact = db.getContact(txn, contactId);
if (!canBeShared(txn, f.getId(), contact))
checkForRaceCondition(txn, f, contact);
// check if shareable can be shared
I invitation = (I) msg;
S f = getSFactory().parse(invitation);
ContactId contactId = getContactId(txn, m.getGroupId());
Contact contact = db.getContact(txn, contactId);
if (!canBeShared(txn, f.getId(), contact))
checkForRaceCondition(txn, f, contact);
// initialize state and process invitation
IS state = initializeInviteeState(txn, contactId, invitation,
m.getId());
InviteeEngine<IS, IR> engine =
new InviteeEngine<IS, IR>(getIRFactory(), clock);
processInviteeStateUpdate(txn, m.getId(),
engine.onMessageReceived(state, msg));
trackIncomingMessage(txn, m);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
deleteMessage(txn, m.getId());
}
// initialize state and process invitation
IS state = initializeInviteeState(txn, contactId, invitation,
m.getId());
InviteeEngine<IS, IR> engine =
new InviteeEngine<IS, IR>(getIRFactory(), clock);
processInviteeStateUpdate(txn, m.getId(),
engine.onMessageReceived(state, msg));
trackIncomingMessage(txn, m);
} else if (msg.getType() == SHARE_MSG_TYPE_ACCEPT ||
msg.getType() == SHARE_MSG_TYPE_DECLINE) {
// we are a sharer who just received a response
@@ -262,7 +257,7 @@ abstract class SharingManagerImpl<S extends Shareable, I extends Invitation, IS
}
} else {
// message has passed validator, so that should never happen
throw new RuntimeException("Illegal Sharing Message");
throw new AssertionError("Illegal Sharing Message");
}
// don't share message as other party already has it
return false;

View File

@@ -360,16 +360,18 @@ class ValidationManagerImpl implements ValidationManager, Service,
boolean shareMsg = false;
IncomingMessageHook hook = hooks.get(c);
if (hook != null) {
shareMsg = hook.incomingMessage(txn, m, meta);
}
// TODO: Find a better way for clients to signal validity, #643
if (db.getRawMessage(txn, m.getId()) == null) {
db.setMessageState(txn, m.getId(), INVALID);
return new DeliveryResult(false, false);
} else {
db.setMessageState(txn, m.getId(), DELIVERED);
return new DeliveryResult(true, shareMsg);
try {
shareMsg = hook.incomingMessage(txn, m, meta);
} catch (InvalidMessageException e) {
// message is invalid, mark it as such and delete it
db.setMessageState(txn, m.getId(), INVALID);
db.deleteMessageMetadata(txn, m.getId());
db.deleteMessage(txn, m.getId());
return new DeliveryResult(false, false);
}
}
db.setMessageState(txn, m.getId(), DELIVERED);
return new DeliveryResult(true, shareMsg);
}
private Queue<MessageId> getPendingDependents(Transaction txn, MessageId m)

View File

@@ -110,8 +110,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver the first message
oneOf(hook).incomingMessage(txn2, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn2, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn2, messageId, DELIVERED);
// Get any pending dependents
oneOf(db).getMessageDependents(txn2, messageId);
@@ -215,8 +213,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver the message
oneOf(hook).incomingMessage(txn2, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn2, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn2, messageId, DELIVERED);
// Get any pending dependents
oneOf(db).getMessageDependents(txn2, messageId);
@@ -240,8 +236,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver the dependent
oneOf(hook).incomingMessage(txn3, message2, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn3, messageId2);
will(returnValue(raw));
oneOf(db).setMessageState(txn3, messageId2, DELIVERED);
// Get any pending dependents
oneOf(db).getMessageDependents(txn3, messageId2);
@@ -366,8 +360,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(true));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);
// Get any pending dependents
oneOf(db).getMessageDependents(txn1, messageId);
@@ -589,8 +581,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);
// Get any pending dependents
oneOf(db).getMessageDependents(txn1, messageId);
@@ -707,8 +697,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);
// Get any pending dependents
oneOf(db).getMessageDependents(txn1, messageId);
@@ -953,8 +941,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);
// The message has two pending dependents: 1 and 2
oneOf(db).getMessageDependents(txn1, messageId);
@@ -978,8 +964,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver message 1
oneOf(hook).incomingMessage(txn2, message1, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn2, messageId1);
will(returnValue(raw));
oneOf(db).setMessageState(txn2, messageId1, DELIVERED);
// Message 1 has one pending dependent: 3
oneOf(db).getMessageDependents(txn2, messageId1);
@@ -1003,8 +987,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver message 2
oneOf(hook).incomingMessage(txn3, message2, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn3, messageId2);
will(returnValue(raw));
oneOf(db).setMessageState(txn3, messageId2, DELIVERED);
// Message 2 has one pending dependent: 3 (same dependent as 1)
oneOf(db).getMessageDependents(txn3, messageId2);
@@ -1027,8 +1009,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
will(returnValue(metadata));
// Deliver message 3
oneOf(hook).incomingMessage(txn4, message3, metadata);
oneOf(db).getRawMessage(txn4, messageId3);
will(returnValue(raw));
oneOf(db).setMessageState(txn4, messageId3, DELIVERED);
// Message 3 has one pending dependent: 4
oneOf(db).getMessageDependents(txn4, messageId3);
@@ -1059,8 +1039,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver message 4
oneOf(hook).incomingMessage(txn6, message4, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn6, messageId4);
will(returnValue(raw));
oneOf(db).setMessageState(txn6, messageId4, DELIVERED);
// Message 4 has no pending dependents
oneOf(db).getMessageDependents(txn6, messageId4);
@@ -1111,8 +1089,6 @@ public class ValidationManagerImplTest extends BriarTestCase {
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);
// Get any pending dependents
oneOf(db).getMessageDependents(txn1, messageId);