Allow messages to be deleted in delivery hook

This commit is contained in:
Torsten Grote
2016-11-03 18:42:26 -02:00
parent e810a1265a
commit 3f9a254a0b
10 changed files with 103 additions and 113 deletions

View File

@@ -48,8 +48,17 @@ public interface MessageQueueManager {
/** /**
* Called once for each incoming message that passes validation. * Called once for each incoming message that passes validation.
* Messages are passed to the hook in order. * Messages are passed to the hook in order.
*
* @throws DbException should only be used for real database errors
* @throws InvalidMessageException for any non-database error
* that occurs while handling remotely created data.
* This includes errors that occur while handling locally created data
* in a context controlled by remotely created data
* (for example, parsing the metadata of a dependency
* of an incoming message).
* Never rethrow DbException as InvalidMessageException
*/ */
void incomingMessage(Transaction txn, QueueMessage q, Metadata meta) void incomingMessage(Transaction txn, QueueMessage q, Metadata meta)
throws DbException; throws DbException, InvalidMessageException;
} }
} }

View File

@@ -55,9 +55,20 @@ public interface ValidationManager {
/** /**
* Called once for each incoming message that passes validation. * Called once for each incoming message that passes validation.
*
* @return whether or not this message should be shared * @return whether or not this message should be shared
* @throws DbException should only be used for real database errors
* @throws InvalidMessageException for any non-database error
* that occurs while handling remotely created data.
* This includes errors that occur while handling locally created data
* in a context controlled by remotely created data
* (for example, parsing the metadata of a dependency
* of an incoming message).
* Throwing this will delete the incoming message and its metadata
* marking it as invalid in the database.
* Never rethrow DbException as InvalidMessageException
*/ */
boolean incomingMessage(Transaction txn, Message m, Metadata meta) boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException; throws DbException, InvalidMessageException;
} }
} }

View File

@@ -48,7 +48,6 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
import javax.inject.Inject; import javax.inject.Inject;
@@ -75,9 +74,6 @@ import static org.briarproject.blogs.BlogPostValidator.authorToBdfDictionary;
class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager, class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager,
AddContactHook, RemoveContactHook, Client { AddContactHook, RemoveContactHook, Client {
private static final Logger LOG =
Logger.getLogger(BlogManagerImpl.class.getName());
static final ClientId CLIENT_ID = new ClientId(StringUtils.fromHexString( static final ClientId CLIENT_ID = new ClientId(StringUtils.fromHexString(
"dafbe56f0c8971365cea4bb5f08ec9a6" + "dafbe56f0c8971365cea4bb5f08ec9a6" +
"1d686e058b943997b6ff259ba423f613")); "1d686e058b943997b6ff259ba423f613"));

View File

@@ -14,6 +14,7 @@ import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata; import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.Transaction; import org.briarproject.api.db.Transaction;
import org.briarproject.api.sync.GroupId; import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.InvalidMessageException;
import org.briarproject.api.sync.Message; import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId; import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.ValidationManager.IncomingMessageHook; import org.briarproject.api.sync.ValidationManager.IncomingMessageHook;
@@ -39,24 +40,39 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
this.metadataParser = metadataParser; this.metadataParser = metadataParser;
} }
/**
* Called once for each incoming message that passes validation.
*
* @throws DbException Should only be used for real database errors.
* Do not rethrow
* @throws FormatException Use this for any non-database error
* that occurs while handling remotely created data.
* This includes errors that occur while handling locally created data
* in a context controlled by remotely created data
* (for example, parsing the metadata of a dependency
* of an incoming message).
* Throwing this will delete the incoming message and its metadata
* marking it as invalid in the database.
* Never rethrow DbException as FormatException
*/
protected abstract boolean incomingMessage(Transaction txn, Message m, protected abstract boolean incomingMessage(Transaction txn, Message m,
BdfList body, BdfDictionary meta) throws DbException, BdfList body, BdfDictionary meta) throws DbException,
FormatException; FormatException;
@Override @Override
public boolean incomingMessage(Transaction txn, Message m, Metadata meta) public boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException { throws DbException, InvalidMessageException {
return incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH); return incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH);
} }
@Override @Override
public void incomingMessage(Transaction txn, QueueMessage q, Metadata meta) public void incomingMessage(Transaction txn, QueueMessage q, Metadata meta)
throws DbException { throws DbException, InvalidMessageException {
incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH); incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH);
} }
private boolean incomingMessage(Transaction txn, Message m, Metadata meta, private boolean incomingMessage(Transaction txn, Message m, Metadata meta,
int headerLength) throws DbException { int headerLength) throws DbException, InvalidMessageException {
try { try {
byte[] raw = m.getRaw(); byte[] raw = m.getRaw();
BdfList body = clientHelper.toList(raw, headerLength, BdfList body = clientHelper.toList(raw, headerLength,
@@ -64,7 +80,7 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
BdfDictionary metaDictionary = metadataParser.parse(meta); BdfDictionary metaDictionary = metadataParser.parse(meta);
return incomingMessage(txn, m, body, metaDictionary); return incomingMessage(txn, m, body, metaDictionary);
} catch (FormatException e) { } catch (FormatException e) {
throw new DbException(e); throw new InvalidMessageException(e);
} }
} }

View File

@@ -132,7 +132,7 @@ class MessageQueueManagerImpl implements MessageQueueManager {
private long outgoingPosition, incomingPosition; private long outgoingPosition, incomingPosition;
private final TreeMap<Long, MessageId> pending; private final TreeMap<Long, MessageId> pending;
QueueState(long outgoingPosition, long incomingPosition, private QueueState(long outgoingPosition, long incomingPosition,
TreeMap<Long, MessageId> pending) { TreeMap<Long, MessageId> pending) {
this.outgoingPosition = outgoingPosition; this.outgoingPosition = outgoingPosition;
this.incomingPosition = incomingPosition; this.incomingPosition = incomingPosition;
@@ -166,7 +166,7 @@ class MessageQueueManagerImpl implements MessageQueueManager {
private final QueueMessageValidator delegate; private final QueueMessageValidator delegate;
DelegatingMessageValidator(QueueMessageValidator delegate) { private DelegatingMessageValidator(QueueMessageValidator delegate) {
this.delegate = delegate; this.delegate = delegate;
} }
@@ -193,8 +193,8 @@ class MessageQueueManagerImpl implements MessageQueueManager {
} }
@Override @Override
public boolean incomingMessage(Transaction txn, Message m, Metadata meta) public boolean incomingMessage(Transaction txn, Message m,
throws DbException { Metadata meta) throws DbException, InvalidMessageException {
long queuePosition = ByteUtils.readUint64(m.getRaw(), long queuePosition = ByteUtils.readUint64(m.getRaw(),
MESSAGE_HEADER_LENGTH); MESSAGE_HEADER_LENGTH);
QueueState queueState = loadQueueState(txn, m.getGroupId()); QueueState queueState = loadQueueState(txn, m.getGroupId());

View File

@@ -148,14 +148,12 @@ class ForumManagerImpl extends BdfIncomingMessageHook implements ForumManager {
BdfDictionary meta = new BdfDictionary(); BdfDictionary meta = new BdfDictionary();
meta.put(KEY_TIMESTAMP, p.getMessage().getTimestamp()); meta.put(KEY_TIMESTAMP, p.getMessage().getTimestamp());
if (p.getParent() != null) meta.put(KEY_PARENT, p.getParent()); if (p.getParent() != null) meta.put(KEY_PARENT, p.getParent());
if (p.getAuthor() != null) { Author a = p.getAuthor();
Author a = p.getAuthor(); BdfDictionary authorMeta = new BdfDictionary();
BdfDictionary authorMeta = new BdfDictionary(); authorMeta.put(KEY_ID, a.getId());
authorMeta.put(KEY_ID, a.getId()); authorMeta.put(KEY_NAME, a.getName());
authorMeta.put(KEY_NAME, a.getName()); authorMeta.put(KEY_PUBLIC_NAME, a.getPublicKey());
authorMeta.put(KEY_PUBLIC_NAME, a.getPublicKey()); meta.put(KEY_AUTHOR, authorMeta);
meta.put(KEY_AUTHOR, authorMeta);
}
meta.put(KEY_LOCAL, true); meta.put(KEY_LOCAL, true);
meta.put(MSG_KEY_READ, true); meta.put(MSG_KEY_READ, true);
clientHelper.addLocalMessage(txn, p.getMessage(), meta, true); clientHelper.addLocalMessage(txn, p.getMessage(), meta, true);

View File

@@ -32,7 +32,6 @@ import org.briarproject.api.sync.MessageStatus;
import org.briarproject.clients.ConversationClientImpl; import org.briarproject.clients.ConversationClientImpl;
import org.briarproject.util.StringUtils; import org.briarproject.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@@ -206,7 +205,7 @@ class IntroductionManagerImpl extends ConversationClientImpl
*/ */
@Override @Override
protected boolean incomingMessage(Transaction txn, Message m, BdfList body, protected boolean incomingMessage(Transaction txn, Message m, BdfList body,
BdfDictionary message) throws DbException { BdfDictionary message) throws DbException, FormatException {
// Get message data and type // Get message data and type
GroupId groupId = m.getGroupId(); GroupId groupId = m.getGroupId();
@@ -220,19 +219,9 @@ class IntroductionManagerImpl extends ConversationClientImpl
} catch (FormatException e) { } catch (FormatException e) {
stateExists = false; stateExists = false;
} }
BdfDictionary state; if (stateExists) throw new FormatException();
try { BdfDictionary state =
if (stateExists) throw new FormatException(); introduceeManager.initialize(txn, groupId, message);
state = introduceeManager.initialize(txn, groupId, message);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) {
LOG.warning(
"Could not initialize introducee state, deleting...");
LOG.log(WARNING, e.toString(), e);
}
deleteMessage(txn, m.getId());
return false;
}
try { try {
introduceeManager.incomingMessage(txn, state, message); introduceeManager.incomingMessage(txn, state, message);
trackIncomingMessage(txn, m); trackIncomingMessage(txn, m);
@@ -240,21 +229,15 @@ class IntroductionManagerImpl extends ConversationClientImpl
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
introduceeManager.abort(txn, state); introduceeManager.abort(txn, state);
} catch (FormatException e) { } catch (FormatException e) {
// FIXME necessary?
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
introduceeManager.abort(txn, state); introduceeManager.abort(txn, state);
} }
} }
// our role can be anything // our role can be anything
else if (type == TYPE_RESPONSE || type == TYPE_ACK || type == TYPE_ABORT) { else if (type == TYPE_RESPONSE || type == TYPE_ACK || type == TYPE_ABORT) {
BdfDictionary state; BdfDictionary state =
try { getSessionState(txn, groupId, message.getRaw(SESSION_ID));
state = getSessionState(txn, groupId,
message.getRaw(SESSION_ID));
} catch (FormatException e) {
LOG.warning("Could not find state for message, deleting...");
deleteMessage(txn, m.getId());
return false;
}
long role = state.getLong(ROLE, -1L); long role = state.getLong(ROLE, -1L);
try { try {
@@ -263,18 +246,15 @@ class IntroductionManagerImpl extends ConversationClientImpl
} else if (role == ROLE_INTRODUCEE) { } else if (role == ROLE_INTRODUCEE) {
introduceeManager.incomingMessage(txn, state, message); introduceeManager.incomingMessage(txn, state, message);
} else { } else {
if(LOG.isLoggable(WARNING)) { throw new AssertionError("Unknown role '" + role + "'");
LOG.warning("Unknown role '" + role +
"'. Deleting message...");
deleteMessage(txn, m.getId());
}
} }
if (type == TYPE_RESPONSE) trackIncomingMessage(txn, m); if (type == TYPE_RESPONSE) trackIncomingMessage(txn, m);
} catch (DbException e) { } catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
if (role == ROLE_INTRODUCER) introducerManager.abort(txn, state); if (role == ROLE_INTRODUCER) introducerManager.abort(txn, state);
else introduceeManager.abort(txn, state); else introduceeManager.abort(txn, state);
} catch (IOException e) { } catch (FormatException e) {
// FIXME necessary?
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
if (role == ROLE_INTRODUCER) introducerManager.abort(txn, state); if (role == ROLE_INTRODUCER) introducerManager.abort(txn, state);
else introduceeManager.abort(txn, state); else introduceeManager.abort(txn, state);

View File

@@ -41,7 +41,6 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
import javax.inject.Inject; import javax.inject.Inject;
@@ -65,8 +64,6 @@ import static org.briarproject.privategroup.Constants.KEY_TYPE;
public class PrivateGroupManagerImpl extends BdfIncomingMessageHook implements public class PrivateGroupManagerImpl extends BdfIncomingMessageHook implements
PrivateGroupManager { PrivateGroupManager {
private static final Logger LOG =
Logger.getLogger(PrivateGroupManagerImpl.class.getName());
static final ClientId CLIENT_ID = new ClientId( static final ClientId CLIENT_ID = new ClientId(
StringUtils.fromHexString("5072697661746547726f75704d616e61" StringUtils.fromHexString("5072697661746547726f75704d616e61"
+ "67657220627920546f727374656e2047")); + "67657220627920546f727374656e2047"));
@@ -198,7 +195,8 @@ public class PrivateGroupManagerImpl extends BdfIncomingMessageHook implements
try { try {
BdfDictionary meta = new BdfDictionary(); BdfDictionary meta = new BdfDictionary();
meta.put(KEY_TYPE, POST.getInt()); meta.put(KEY_TYPE, POST.getInt());
if (m.getParent() != null) meta.put(KEY_PARENT_MSG_ID, m.getParent()); if (m.getParent() != null)
meta.put(KEY_PARENT_MSG_ID, m.getParent());
addMessageMetadata(meta, m, true); addMessageMetadata(meta, m, true);
clientHelper.addLocalMessage(txn, m.getMessage(), meta, true); clientHelper.addLocalMessage(txn, m.getMessage(), meta, true);
setPreviousMsgId(txn, m.getMessage().getGroupId(), setPreviousMsgId(txn, m.getMessage().getGroupId(),
@@ -429,44 +427,29 @@ public class PrivateGroupManagerImpl extends BdfIncomingMessageHook implements
MessageId parentId = new MessageId(parentIdBytes); MessageId parentId = new MessageId(parentIdBytes);
BdfDictionary parentMeta = clientHelper BdfDictionary parentMeta = clientHelper
.getMessageMetadataAsDictionary(txn, parentId); .getMessageMetadataAsDictionary(txn, parentId);
if (timestamp <= parentMeta.getLong(KEY_TIMESTAMP)) { if (timestamp <= parentMeta.getLong(KEY_TIMESTAMP))
// FIXME throw new InvalidMessageException() (#643) throw new FormatException();
db.deleteMessage(txn, m.getId());
return false;
}
MessageType parentType = MessageType MessageType parentType = MessageType
.valueOf(parentMeta.getLong(KEY_TYPE).intValue()); .valueOf(parentMeta.getLong(KEY_TYPE).intValue());
if (parentType != POST) { if (parentType != POST)
// FIXME throw new InvalidMessageException() (#643) throw new FormatException();
db.deleteMessage(txn, m.getId());
return false;
}
} }
// and the member's previous message // and the member's previous message
byte[] previousMsgIdBytes = meta.getRaw(KEY_PREVIOUS_MSG_ID); byte[] previousMsgIdBytes = meta.getRaw(KEY_PREVIOUS_MSG_ID);
MessageId previousMsgId = new MessageId(previousMsgIdBytes); MessageId previousMsgId = new MessageId(previousMsgIdBytes);
BdfDictionary previousMeta = clientHelper BdfDictionary previousMeta = clientHelper
.getMessageMetadataAsDictionary(txn, previousMsgId); .getMessageMetadataAsDictionary(txn, previousMsgId);
if (timestamp <= previousMeta.getLong(KEY_TIMESTAMP)) { if (timestamp <= previousMeta.getLong(KEY_TIMESTAMP))
// FIXME throw new InvalidMessageException() (#643) throw new FormatException();
db.deleteMessage(txn, m.getId());
return false;
}
// previous message must be from same member // previous message must be from same member
if (!Arrays.equals(meta.getRaw(KEY_MEMBER_ID), if (!Arrays.equals(meta.getRaw(KEY_MEMBER_ID),
previousMeta.getRaw(KEY_MEMBER_ID))) { previousMeta.getRaw(KEY_MEMBER_ID)))
// FIXME throw new InvalidMessageException() (#643) throw new FormatException();
db.deleteMessage(txn, m.getId());
return false;
}
// previous message must be a POST or JOIN // previous message must be a POST or JOIN
MessageType previousType = MessageType MessageType previousType = MessageType
.valueOf(previousMeta.getLong(KEY_TYPE).intValue()); .valueOf(previousMeta.getLong(KEY_TYPE).intValue());
if (previousType != JOIN && previousType != POST) { if (previousType != JOIN && previousType != POST)
// FIXME throw new InvalidMessageException() (#643) throw new FormatException();
db.deleteMessage(txn, m.getId());
return false;
}
trackIncomingMessage(txn, m); trackIncomingMessage(txn, m);
return true; return true;
default: default:

View File

@@ -205,30 +205,25 @@ abstract class SharingManagerImpl<S extends Shareable, I extends Invitation, IS
// this is what we would expect under normal circumstances // this is what we would expect under normal circumstances
stateExists = false; stateExists = false;
} }
try { // check if we already have a state with that sessionId
// check if we already have a state with that sessionId if (stateExists) throw new FormatException();
if (stateExists) throw new FormatException();
// check if shareable can be shared // check if shareable can be shared
I invitation = (I) msg; I invitation = (I) msg;
S f = getSFactory().parse(invitation); S f = getSFactory().parse(invitation);
ContactId contactId = getContactId(txn, m.getGroupId()); ContactId contactId = getContactId(txn, m.getGroupId());
Contact contact = db.getContact(txn, contactId); Contact contact = db.getContact(txn, contactId);
if (!canBeShared(txn, f.getId(), contact)) if (!canBeShared(txn, f.getId(), contact))
checkForRaceCondition(txn, f, contact); checkForRaceCondition(txn, f, contact);
// initialize state and process invitation // initialize state and process invitation
IS state = initializeInviteeState(txn, contactId, invitation, IS state = initializeInviteeState(txn, contactId, invitation,
m.getId()); m.getId());
InviteeEngine<IS, IR> engine = InviteeEngine<IS, IR> engine =
new InviteeEngine<IS, IR>(getIRFactory(), clock); new InviteeEngine<IS, IR>(getIRFactory(), clock);
processInviteeStateUpdate(txn, m.getId(), processInviteeStateUpdate(txn, m.getId(),
engine.onMessageReceived(state, msg)); engine.onMessageReceived(state, msg));
trackIncomingMessage(txn, m); trackIncomingMessage(txn, m);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
deleteMessage(txn, m.getId());
}
} else if (msg.getType() == SHARE_MSG_TYPE_ACCEPT || } else if (msg.getType() == SHARE_MSG_TYPE_ACCEPT ||
msg.getType() == SHARE_MSG_TYPE_DECLINE) { msg.getType() == SHARE_MSG_TYPE_DECLINE) {
// we are a sharer who just received a response // we are a sharer who just received a response
@@ -262,7 +257,7 @@ abstract class SharingManagerImpl<S extends Shareable, I extends Invitation, IS
} }
} else { } else {
// message has passed validator, so that should never happen // message has passed validator, so that should never happen
throw new RuntimeException("Illegal Sharing Message"); throw new AssertionError("Illegal Sharing Message");
} }
// don't share message as other party already has it // don't share message as other party already has it
return false; return false;

View File

@@ -360,16 +360,18 @@ class ValidationManagerImpl implements ValidationManager, Service,
boolean shareMsg = false; boolean shareMsg = false;
IncomingMessageHook hook = hooks.get(c); IncomingMessageHook hook = hooks.get(c);
if (hook != null) { if (hook != null) {
shareMsg = hook.incomingMessage(txn, m, meta); try {
} shareMsg = hook.incomingMessage(txn, m, meta);
// TODO: Find a better way for clients to signal validity, #643 } catch (InvalidMessageException e) {
if (db.getRawMessage(txn, m.getId()) == null) { // message is invalid, mark it as such and delete it
db.setMessageState(txn, m.getId(), INVALID); db.setMessageState(txn, m.getId(), INVALID);
return new DeliveryResult(false, false); db.deleteMessageMetadata(txn, m.getId());
} else { db.deleteMessage(txn, m.getId());
db.setMessageState(txn, m.getId(), DELIVERED); return new DeliveryResult(false, false);
return new DeliveryResult(true, shareMsg); }
} }
db.setMessageState(txn, m.getId(), DELIVERED);
return new DeliveryResult(true, shareMsg);
} }
private Queue<MessageId> getPendingDependents(Transaction txn, MessageId m) private Queue<MessageId> getPendingDependents(Transaction txn, MessageId m)