Factor MessageTracker out of BdfIncomingMessageHook.

This commit is contained in:
akwizgran
2016-11-11 16:55:06 +00:00
parent ab16ee7465
commit aa210fc555
46 changed files with 628 additions and 379 deletions

View File

@@ -3,31 +3,25 @@ package org.briarproject.clients;
import org.briarproject.api.FormatException;
import org.briarproject.api.clients.ClientHelper;
import org.briarproject.api.clients.MessageQueueManager.IncomingQueueMessageHook;
import org.briarproject.api.clients.MessageTracker;
import org.briarproject.api.clients.QueueMessage;
import org.briarproject.api.data.BdfDictionary;
import org.briarproject.api.data.BdfEntry;
import org.briarproject.api.data.BdfList;
import org.briarproject.api.data.MetadataParser;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.nullsafety.NotNullByDefault;
import org.briarproject.api.sync.InvalidMessageException;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.ValidationManager.IncomingMessageHook;
import static org.briarproject.api.clients.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.clients.BdfConstants.GROUP_KEY_LATEST_MSG;
import static org.briarproject.clients.BdfConstants.GROUP_KEY_MSG_COUNT;
import static org.briarproject.clients.BdfConstants.GROUP_KEY_UNREAD_COUNT;
import static org.briarproject.clients.BdfConstants.MSG_KEY_READ;
@NotNullByDefault
public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
IncomingQueueMessageHook, MessageTracker {
IncomingQueueMessageHook {
protected final DatabaseComponent db;
protected final ClientHelper clientHelper;
@@ -44,16 +38,16 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
* Called once for each incoming message that passes validation.
*
* @throws DbException Should only be used for real database errors.
* Do not rethrow
* If this is thrown, delivery will be attempted again at next startup,
* whereas if a FormatException is thrown, the message will be permanently
* invalidated.
* @throws FormatException Use this for any non-database error
* that occurs while handling remotely created data.
* This includes errors that occur while handling locally created data
* in a context controlled by remotely created data
* (for example, parsing the metadata of a dependency
* of an incoming message).
* Throwing this will delete the incoming message and its metadata
* marking it as invalid in the database.
* Never rethrow DbException as FormatException
* Never rethrow DbException as FormatException!
*/
protected abstract boolean incomingMessage(Transaction txn, Message m,
BdfList body, BdfDictionary meta) throws DbException,
@@ -71,8 +65,12 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
@Override
public void incomingMessage(Transaction txn, QueueMessage q, Metadata meta)
throws DbException, FormatException {
incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH);
throws DbException, InvalidMessageException {
try {
incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH);
} catch (FormatException e) {
throw new InvalidMessageException(e);
}
}
private boolean incomingMessage(Transaction txn, Message m, Metadata meta,
@@ -84,102 +82,4 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
return incomingMessage(txn, m, body, metaDictionary);
}
protected void trackIncomingMessage(Transaction txn, Message m)
throws DbException {
trackMessage(txn, m.getGroupId(), m.getTimestamp(), false);
}
protected void trackOutgoingMessage(Transaction txn, Message m)
throws DbException {
trackMessage(txn, m.getGroupId(), m.getTimestamp(), true);
}
protected void trackMessage(Transaction txn, GroupId g, long time,
boolean read) throws DbException {
GroupCount c = getGroupCount(txn, g);
int msgCount = c.getMsgCount() + 1;
int unreadCount = c.getUnreadCount() + (read ? 0 : 1);
long latestTime =
time > c.getLatestMsgTime() ? time : c.getLatestMsgTime();
storeGroupCount(txn, g,
new GroupCount(msgCount, unreadCount, latestTime));
}
@Override
public GroupCount getGroupCount(GroupId g) throws DbException {
GroupCount count;
Transaction txn = db.startTransaction(true);
try {
count = getGroupCount(txn, g);
db.commitTransaction(txn);
}
finally {
db.endTransaction(txn);
}
return count;
}
protected GroupCount getGroupCount(Transaction txn, GroupId g)
throws DbException {
GroupCount count;
try {
BdfDictionary d = clientHelper.getGroupMetadataAsDictionary(txn, g);
count = new GroupCount(
d.getLong(GROUP_KEY_MSG_COUNT, 0L).intValue(),
d.getLong(GROUP_KEY_UNREAD_COUNT, 0L).intValue(),
d.getLong(GROUP_KEY_LATEST_MSG, 0L)
);
} catch (FormatException e) {
throw new DbException(e);
}
return count;
}
private void storeGroupCount(Transaction txn, GroupId g, GroupCount c)
throws DbException{
try {
BdfDictionary d = BdfDictionary.of(
new BdfEntry(GROUP_KEY_MSG_COUNT, c.getMsgCount()),
new BdfEntry(GROUP_KEY_UNREAD_COUNT, c.getUnreadCount()),
new BdfEntry(GROUP_KEY_LATEST_MSG, c.getLatestMsgTime())
);
clientHelper.mergeGroupMetadata(txn, g, d);
} catch (FormatException e) {
throw new DbException(e);
}
}
@Override
public void setReadFlag(GroupId g, MessageId m, boolean read)
throws DbException {
Transaction txn = db.startTransaction(false);
try {
// check current read status of message
BdfDictionary old =
clientHelper.getMessageMetadataAsDictionary(txn, m);
boolean wasRead = old.getBoolean(MSG_KEY_READ, false);
// if status changed
if (wasRead != read) {
// mark individual message as read
BdfDictionary meta = new BdfDictionary();
meta.put(MSG_KEY_READ, read);
clientHelper.mergeMessageMetadata(txn, m, meta);
// update unread counter in group metadata
GroupCount c = getGroupCount(txn, g);
BdfDictionary d = new BdfDictionary();
int count = c.getUnreadCount() + (read ? -1 : 1);
if (count < 0) throw new DbException();
d.put(GROUP_KEY_UNREAD_COUNT, count);
clientHelper.mergeGroupMetadata(txn, g, d);
}
db.commitTransaction(txn);
} catch (FormatException e) {
throw new DbException(e);
} finally {
db.endTransaction(txn);
}
}
}

View File

@@ -9,6 +9,7 @@ import org.briarproject.api.data.BdfDictionary;
import org.briarproject.api.data.BdfList;
import org.briarproject.api.data.MetadataEncoder;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.nullsafety.NotNullByDefault;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.InvalidMessageException;
import org.briarproject.api.sync.Message;
@@ -25,6 +26,7 @@ import static org.briarproject.api.clients.QueueMessage.QUEUE_MESSAGE_HEADER_LEN
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.api.transport.TransportConstants.MAX_CLOCK_DIFFERENCE;
@NotNullByDefault
public abstract class BdfMessageValidator implements MessageValidator,
QueueMessageValidator {
@@ -108,7 +110,7 @@ public abstract class BdfMessageValidator implements MessageValidator,
if (b != null && b.length != length) throw new FormatException();
}
protected void checkSize(BdfList list, int minSize, int maxSize)
protected void checkSize(@Nullable BdfList list, int minSize, int maxSize)
throws FormatException {
if (list != null) {
if (list.size() < minSize) throw new FormatException();

View File

@@ -3,6 +3,7 @@ package org.briarproject.clients;
import org.briarproject.api.clients.ClientHelper;
import org.briarproject.api.clients.ContactGroupFactory;
import org.briarproject.api.clients.MessageQueueManager;
import org.briarproject.api.clients.MessageTracker;
import org.briarproject.api.clients.QueueMessageFactory;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.BdfReaderFactory;
@@ -52,4 +53,8 @@ public class ClientsModule {
return new QueueMessageFactoryImpl(crypto);
}
@Provides
MessageTracker provideMessageTracker(MessageTrackerImpl messageTracker) {
return messageTracker;
}
}

View File

@@ -1,6 +1,8 @@
package org.briarproject.clients;
import org.briarproject.api.clients.ClientHelper;
import org.briarproject.api.clients.MessageTracker;
import org.briarproject.api.clients.MessageTracker.GroupCount;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.data.MetadataParser;
@@ -8,25 +10,34 @@ import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.messaging.ConversationManager.ConversationClient;
import org.briarproject.api.sync.Group;
import org.briarproject.api.nullsafety.NotNullByDefault;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.MessageId;
@NotNullByDefault
public abstract class ConversationClientImpl extends BdfIncomingMessageHook
implements ConversationClient {
protected ConversationClientImpl(DatabaseComponent db,
ClientHelper clientHelper, MetadataParser metadataParser) {
super(db, clientHelper, metadataParser);
}
protected final MessageTracker messageTracker;
protected abstract Group getContactGroup(Contact contact);
protected ConversationClientImpl(DatabaseComponent db,
ClientHelper clientHelper, MetadataParser metadataParser,
MessageTracker messageTracker) {
super(db, clientHelper, metadataParser);
this.messageTracker = messageTracker;
}
@Override
public GroupCount getGroupCount(Transaction txn, ContactId contactId)
throws DbException {
Contact contact = db.getContact(txn, contactId);
GroupId groupId = getContactGroup(contact).getId();
return getGroupCount(txn, groupId);
return messageTracker.getGroupCount(txn, groupId);
}
@Override
public void setReadFlag(GroupId g, MessageId m, boolean read)
throws DbException {
messageTracker.setReadFlag(g, m, read);
}
}

View File

@@ -11,6 +11,7 @@ import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.nullsafety.NotNullByDefault;
import org.briarproject.api.sync.ClientId;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupId;
@@ -20,6 +21,7 @@ import org.briarproject.api.sync.MessageContext;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.ValidationManager;
import org.briarproject.api.sync.ValidationManager.IncomingMessageHook;
import org.briarproject.api.sync.ValidationManager.MessageValidator;
import org.briarproject.util.ByteUtils;
import java.util.ArrayList;
@@ -29,12 +31,14 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static org.briarproject.api.clients.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
@NotNullByDefault
class MessageQueueManagerImpl implements MessageQueueManager {
private static final String OUTGOING_POSITION_KEY = "nextOut";
@@ -139,6 +143,7 @@ class MessageQueueManagerImpl implements MessageQueueManager {
this.pending = pending;
}
@Nullable
MessageId popIncomingMessageId() {
Iterator<Entry<Long, MessageId>> it = pending.entrySet().iterator();
if (!it.hasNext()) {
@@ -161,8 +166,9 @@ class MessageQueueManagerImpl implements MessageQueueManager {
}
}
@NotNullByDefault
private static class DelegatingMessageValidator
implements ValidationManager.MessageValidator {
implements MessageValidator {
private final QueueMessageValidator delegate;
@@ -174,21 +180,24 @@ class MessageQueueManagerImpl implements MessageQueueManager {
public MessageContext validateMessage(Message m, Group g)
throws InvalidMessageException {
byte[] raw = m.getRaw();
if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH) return null;
if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH)
throw new InvalidMessageException();
long queuePosition = ByteUtils.readUint64(raw,
MESSAGE_HEADER_LENGTH);
if (queuePosition < 0) return null;
if (queuePosition < 0) throw new InvalidMessageException();
QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(),
m.getTimestamp(), queuePosition, raw);
return delegate.validateMessage(q, g);
}
}
@NotNullByDefault
private class DelegatingIncomingMessageHook implements IncomingMessageHook {
private final IncomingQueueMessageHook delegate;
private DelegatingIncomingMessageHook(IncomingQueueMessageHook delegate) {
private DelegatingIncomingMessageHook(
IncomingQueueMessageHook delegate) {
this.delegate = delegate;
}
@@ -227,20 +236,16 @@ class MessageQueueManagerImpl implements MessageQueueManager {
// Save the queue state before passing control to the delegate
saveQueueState(txn, m.getGroupId(), queueState);
// Deliver the messages to the delegate
try {
delegate.incomingMessage(txn, q, meta);
for (MessageId id : consecutive) {
byte[] raw = db.getRawMessage(txn, id);
meta = db.getMessageMetadata(txn, id);
q = queueMessageFactory.createMessage(id, raw);
if (LOG.isLoggable(INFO)) {
LOG.info("Delivering pending message with position "
+ q.getQueuePosition());
}
delegate.incomingMessage(txn, q, meta);
delegate.incomingMessage(txn, q, meta);
for (MessageId id : consecutive) {
byte[] raw = db.getRawMessage(txn, id);
meta = db.getMessageMetadata(txn, id);
q = queueMessageFactory.createMessage(id, raw);
if (LOG.isLoggable(INFO)) {
LOG.info("Delivering pending message with position "
+ q.getQueuePosition());
}
} catch (FormatException e) {
throw new InvalidMessageException(e);
delegate.incomingMessage(txn, q, meta);
}
}
// message queues are only useful for groups with two members

View File

@@ -0,0 +1,132 @@
package org.briarproject.clients;
import org.briarproject.api.FormatException;
import org.briarproject.api.clients.ClientHelper;
import org.briarproject.api.clients.MessageTracker;
import org.briarproject.api.data.BdfDictionary;
import org.briarproject.api.data.BdfEntry;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.nullsafety.NotNullByDefault;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import javax.inject.Inject;
import static org.briarproject.clients.BdfConstants.GROUP_KEY_LATEST_MSG;
import static org.briarproject.clients.BdfConstants.GROUP_KEY_MSG_COUNT;
import static org.briarproject.clients.BdfConstants.GROUP_KEY_UNREAD_COUNT;
import static org.briarproject.clients.BdfConstants.MSG_KEY_READ;
@NotNullByDefault
class MessageTrackerImpl implements MessageTracker {
private final DatabaseComponent db;
private final ClientHelper clientHelper;
@Inject
MessageTrackerImpl(DatabaseComponent db, ClientHelper clientHelper) {
this.db = db;
this.clientHelper = clientHelper;
}
@Override
public void trackIncomingMessage(Transaction txn, Message m)
throws DbException {
trackMessage(txn, m.getGroupId(), m.getTimestamp(), false);
}
@Override
public void trackOutgoingMessage(Transaction txn, Message m)
throws DbException {
trackMessage(txn, m.getGroupId(), m.getTimestamp(), true);
}
@Override
public void trackMessage(Transaction txn, GroupId g, long time,
boolean read) throws DbException {
GroupCount c = getGroupCount(txn, g);
int msgCount = c.getMsgCount() + 1;
int unreadCount = c.getUnreadCount() + (read ? 0 : 1);
long latestMsgTime = Math.max(c.getLatestMsgTime(), time);
storeGroupCount(txn, g, new GroupCount(msgCount, unreadCount,
latestMsgTime));
}
@Override
public GroupCount getGroupCount(GroupId g) throws DbException {
GroupCount count;
Transaction txn = db.startTransaction(true);
try {
count = getGroupCount(txn, g);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return count;
}
@Override
public GroupCount getGroupCount(Transaction txn, GroupId g)
throws DbException {
try {
BdfDictionary d = clientHelper.getGroupMetadataAsDictionary(txn, g);
return new GroupCount(
d.getLong(GROUP_KEY_MSG_COUNT, 0L).intValue(),
d.getLong(GROUP_KEY_UNREAD_COUNT, 0L).intValue(),
d.getLong(GROUP_KEY_LATEST_MSG, 0L)
);
} catch (FormatException e) {
throw new DbException(e);
}
}
private void storeGroupCount(Transaction txn, GroupId g, GroupCount c)
throws DbException {
try {
BdfDictionary d = BdfDictionary.of(
new BdfEntry(GROUP_KEY_MSG_COUNT, c.getMsgCount()),
new BdfEntry(GROUP_KEY_UNREAD_COUNT, c.getUnreadCount()),
new BdfEntry(GROUP_KEY_LATEST_MSG, c.getLatestMsgTime())
);
clientHelper.mergeGroupMetadata(txn, g, d);
} catch (FormatException e) {
throw new DbException(e);
}
}
@Override
public void setReadFlag(GroupId g, MessageId m, boolean read)
throws DbException {
Transaction txn = db.startTransaction(false);
try {
// check current read status of message
BdfDictionary old =
clientHelper.getMessageMetadataAsDictionary(txn, m);
boolean wasRead = old.getBoolean(MSG_KEY_READ, false);
// if status changed
if (wasRead != read) {
// mark individual message as read
BdfDictionary meta = new BdfDictionary();
meta.put(MSG_KEY_READ, read);
clientHelper.mergeMessageMetadata(txn, m, meta);
// update unread counter in group metadata
GroupCount c = getGroupCount(txn, g);
int unreadCount = c.getUnreadCount() + (read ? -1 : 1);
if (unreadCount < 0) throw new DbException();
storeGroupCount(txn, g, new GroupCount(c.getMsgCount(),
unreadCount, c.getLatestMsgTime()));
}
db.commitTransaction(txn);
} catch (FormatException e) {
throw new DbException(e);
} finally {
db.endTransaction(txn);
}
}
}