Merge branch '589-when-a-message-is-shared-share-its-transitive-dependencies' into 'master'

When a message is shared, share its transitive dependencies

Like other recursive operations on the dependency graph, this is
not done in a single transaction to prevent an attacker from creating
arbitrary large transactions.

So at startup, the `ValidationManager` finds and resumes any
unfinished operations, by looking for unshared messages with shared
dependents.

Closes #589

See merge request !325
This commit is contained in:
akwizgran
2016-09-28 16:19:43 +00:00
19 changed files with 480 additions and 76 deletions

View File

@@ -68,12 +68,6 @@ public interface ClientHelper {
void mergeMessageMetadata(Transaction txn, MessageId m,
BdfDictionary metadata) throws DbException, FormatException;
/**
* Marks the given message as shared or unshared with other contacts.
*/
void setMessageShared(Transaction txn, MessageId m, boolean shared)
throws DbException;
byte[] toByteArray(BdfDictionary dictionary) throws FormatException;
byte[] toByteArray(BdfList list) throws FormatException;

View File

@@ -258,6 +258,15 @@ public interface DatabaseComponent {
Collection<MessageId> getPendingMessages(Transaction txn, ClientId c)
throws DbException;
/**
* Returns the IDs of any messages from the given client
* that have a shared dependent, but are still not shared themselves.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToShare(Transaction txn,
ClientId c) throws DbException;
/**
* Returns the message with the given ID, in serialised form, or null if
* the message has been deleted.
@@ -456,10 +465,9 @@ public interface DatabaseComponent {
throws DbException;
/**
* Marks the given message as shared or unshared.
* Marks the given message as shared.
*/
void setMessageShared(Transaction txn, MessageId m, boolean shared)
throws DbException;
void setMessageShared(Transaction txn, MessageId m) throws DbException;
/**
* Sets the validation and delivery state of the given message.

View File

@@ -55,8 +55,9 @@ public interface ValidationManager {
/**
* Called once for each incoming message that passes validation.
* @return whether or not this message should be shared
*/
void incomingMessage(Transaction txn, Message m, Metadata meta)
boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException;
}
}

View File

@@ -176,7 +176,7 @@ class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager,
}
@Override
protected void incomingMessage(Transaction txn, Message m, BdfList list,
protected boolean incomingMessage(Transaction txn, Message m, BdfList list,
BdfDictionary meta) throws DbException, FormatException {
GroupId groupId = m.getGroupId();
@@ -196,13 +196,14 @@ class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager,
throw new FormatException();
}
}
// share dependencies recursively - TODO remove with #598
share(txn, h);
// broadcast event about new post or comment
BlogPostAddedEvent event =
new BlogPostAddedEvent(groupId, h, false);
txn.attach(event);
// shares message and its dependencies
return true;
} else if (type == WRAPPED_COMMENT) {
// Check that the original message ID in the dependency's metadata
// matches the original parent ID of the wrapped comment
@@ -216,6 +217,8 @@ class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager,
throw new FormatException();
}
}
// don't share message until parent arrives
return false;
}
@Override
@@ -672,14 +675,4 @@ class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager,
Long longType = d.getLong(KEY_TYPE);
return MessageType.valueOf(longType.intValue());
}
// TODO remove when implementing #589
@Deprecated
private void share(Transaction txn, BlogPostHeader h) throws DbException {
clientHelper.setMessageShared(txn, h.getId(), true);
if (h instanceof BlogCommentHeader) {
BlogPostHeader h2 = ((BlogCommentHeader) h).getParent();
share(txn, h2);
}
}
}

View File

@@ -28,14 +28,14 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
this.metadataParser = metadataParser;
}
protected abstract void incomingMessage(Transaction txn, Message m,
protected abstract boolean incomingMessage(Transaction txn, Message m,
BdfList body, BdfDictionary meta) throws DbException,
FormatException;
@Override
public void incomingMessage(Transaction txn, Message m, Metadata meta)
public boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException {
incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH);
return incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH);
}
@Override
@@ -44,14 +44,14 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH);
}
private void incomingMessage(Transaction txn, Message m, Metadata meta,
private boolean incomingMessage(Transaction txn, Message m, Metadata meta,
int headerLength) throws DbException {
try {
byte[] raw = m.getRaw();
BdfList body = clientHelper.toList(raw, headerLength,
raw.length - headerLength);
BdfDictionary metaDictionary = metadataParser.parse(meta);
incomingMessage(txn, m, body, metaDictionary);
return incomingMessage(txn, m, body, metaDictionary);
} catch (FormatException e) {
throw new DbException(e);
}

View File

@@ -245,12 +245,6 @@ class ClientHelperImpl implements ClientHelper {
db.mergeMessageMetadata(txn, m, metadataEncoder.encode(metadata));
}
@Override
public void setMessageShared(Transaction txn, MessageId m, boolean shared)
throws DbException {
db.setMessageShared(txn, m, shared);
}
@Override
public byte[] toByteArray(BdfDictionary dictionary) throws FormatException {
ByteArrayOutputStream out = new ByteArrayOutputStream();

View File

@@ -188,12 +188,12 @@ class MessageQueueManagerImpl implements MessageQueueManager {
private final IncomingQueueMessageHook delegate;
DelegatingIncomingMessageHook(IncomingQueueMessageHook delegate) {
private DelegatingIncomingMessageHook(IncomingQueueMessageHook delegate) {
this.delegate = delegate;
}
@Override
public void incomingMessage(Transaction txn, Message m, Metadata meta)
public boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException {
long queuePosition = ByteUtils.readUint64(m.getRaw(),
MESSAGE_HEADER_LENGTH);
@@ -239,6 +239,9 @@ class MessageQueueManagerImpl implements MessageQueueManager {
delegate.incomingMessage(txn, q, meta);
}
}
// message queues are only useful for groups with two members
// so messages don't need to be shared
return false;
}
}
}

View File

@@ -434,6 +434,15 @@ interface Database<T> {
Collection<MessageId> getPendingMessages(T txn, ClientId c)
throws DbException;
/**
* Returns the IDs of any messages from the given client
* that have a shared dependent, but are still not shared themselves.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToShare(T txn, ClientId c)
throws DbException;
/**
* Returns the message with the given ID, in serialised form, or null if
* the message has been deleted.
@@ -599,10 +608,9 @@ interface Database<T> {
throws DbException;
/**
* Marks the given message as shared or unshared.
* Marks the given message as shared.
*/
void setMessageShared(T txn, MessageId m, boolean shared)
throws DbException;
void setMessageShared(T txn, MessageId m) throws DbException;
/**
* Sets the validation and delivery state of the given message.

View File

@@ -94,6 +94,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
this.shutdown = shutdown;
}
@Override
public boolean open() throws DbException {
Runnable shutdownHook = new Runnable() {
public void run() {
@@ -110,12 +111,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return reopened;
}
@Override
public void close() throws DbException {
if (closed.getAndSet(true)) return;
shutdown.removeShutdownHook(shutdownHandle);
db.close();
}
@Override
public Transaction startTransaction(boolean readOnly) throws DbException {
// Don't allow reentrant locking
if (lock.getReadHoldCount() > 0) throw new IllegalStateException();
@@ -135,6 +138,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
@Override
public void endTransaction(Transaction transaction) throws DbException {
try {
T txn = txnClass.cast(transaction.unbox());
@@ -153,6 +157,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return txnClass.cast(transaction.unbox());
}
@Override
public ContactId addContact(Transaction transaction, Author remote,
AuthorId local, boolean verified, boolean active)
throws DbException {
@@ -170,6 +175,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return c;
}
@Override
public void addGroup(Transaction transaction, Group g) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
@@ -179,6 +185,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
@Override
public void addLocalAuthor(Transaction transaction, LocalAuthor a)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -189,6 +196,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
@Override
public void addLocalMessage(Transaction transaction, Message m,
Metadata meta, boolean shared) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -215,6 +223,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
@Override
public void addTransport(Transaction transaction, TransportId t,
int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -223,6 +232,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.addTransport(txn, t, maxLatency);
}
@Override
public void addTransportKeys(Transaction transaction, ContactId c,
TransportKeys k) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -234,6 +244,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.addTransportKeys(txn, c, k);
}
@Override
public boolean containsContact(Transaction transaction, AuthorId remote,
AuthorId local) throws DbException {
T txn = unbox(transaction);
@@ -242,6 +253,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.containsContact(txn, remote, local);
}
@Override
public boolean containsGroup(Transaction transaction, GroupId g)
throws DbException {
T txn = unbox(transaction);
@@ -255,6 +267,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.containsLocalAuthor(txn, local);
}
@Override
public void deleteMessage(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -264,6 +277,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.deleteMessage(txn, m);
}
@Override
public void deleteMessageMetadata(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -274,6 +288,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Nullable
@Override
public Ack generateAck(Transaction transaction, ContactId c,
int maxMessages) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -287,6 +302,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Nullable
@Override
public Collection<byte[]> generateBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -306,6 +322,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Nullable
@Override
public Offer generateOffer(Transaction transaction, ContactId c,
int maxMessages, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -319,6 +336,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Nullable
@Override
public Request generateRequest(Transaction transaction, ContactId c,
int maxMessages) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -333,6 +351,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Nullable
@Override
public Collection<byte[]> generateRequestedBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -352,6 +371,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return Collections.unmodifiableList(messages);
}
@Override
public Contact getContact(Transaction transaction, ContactId c)
throws DbException {
T txn = unbox(transaction);
@@ -360,18 +380,21 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getContact(txn, c);
}
@Override
public Collection<Contact> getContacts(Transaction transaction)
throws DbException {
T txn = unbox(transaction);
return db.getContacts(txn);
}
@Override
public Collection<Contact> getContactsByAuthorId(Transaction transaction,
AuthorId remote) throws DbException {
T txn = unbox(transaction);
return db.getContactsByAuthorId(txn, remote);
}
@Override
public Collection<ContactId> getContacts(Transaction transaction,
AuthorId a) throws DbException {
T txn = unbox(transaction);
@@ -380,11 +403,13 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getContacts(txn, a);
}
@Override
public DeviceId getDeviceId(Transaction transaction) throws DbException {
T txn = unbox(transaction);
return db.getDeviceId(txn);
}
@Override
public Group getGroup(Transaction transaction, GroupId g)
throws DbException {
T txn = unbox(transaction);
@@ -393,6 +418,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getGroup(txn, g);
}
@Override
public Metadata getGroupMetadata(Transaction transaction, GroupId g)
throws DbException {
T txn = unbox(transaction);
@@ -401,12 +427,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getGroupMetadata(txn, g);
}
@Override
public Collection<Group> getGroups(Transaction transaction, ClientId c)
throws DbException {
T txn = unbox(transaction);
return db.getGroups(txn, c);
}
@Override
public LocalAuthor getLocalAuthor(Transaction transaction, AuthorId a)
throws DbException {
T txn = unbox(transaction);
@@ -415,25 +443,36 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getLocalAuthor(txn, a);
}
@Override
public Collection<LocalAuthor> getLocalAuthors(Transaction transaction)
throws DbException {
T txn = unbox(transaction);
return db.getLocalAuthors(txn);
}
@Override
public Collection<MessageId> getMessagesToValidate(Transaction transaction,
ClientId c) throws DbException {
T txn = unbox(transaction);
return db.getMessagesToValidate(txn, c);
}
@Override
public Collection<MessageId> getPendingMessages(Transaction transaction,
ClientId c) throws DbException {
T txn = unbox(transaction);
return db.getPendingMessages(txn, c);
}
@Override
public Collection<MessageId> getMessagesToShare(
Transaction transaction, ClientId c) throws DbException {
T txn = unbox(transaction);
return db.getMessagesToShare(txn, c);
}
@Nullable
@Override
public byte[] getRawMessage(Transaction transaction, MessageId m)
throws DbException {
T txn = unbox(transaction);
@@ -442,6 +481,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getRawMessage(txn, m);
}
@Override
public Map<MessageId, Metadata> getMessageMetadata(Transaction transaction,
GroupId g) throws DbException {
T txn = unbox(transaction);
@@ -450,6 +490,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageMetadata(txn, g);
}
@Override
public Map<MessageId, Metadata> getMessageMetadata(Transaction transaction,
GroupId g, Metadata query) throws DbException {
T txn = unbox(transaction);
@@ -458,6 +499,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageMetadata(txn, g, query);
}
@Override
public Metadata getMessageMetadata(Transaction transaction, MessageId m)
throws DbException {
T txn = unbox(transaction);
@@ -466,6 +508,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageMetadata(txn, m);
}
@Override
public Metadata getMessageMetadataForValidator(Transaction transaction,
MessageId m)
throws DbException {
@@ -475,6 +518,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageMetadataForValidator(txn, m);
}
@Override
public State getMessageState(Transaction transaction, MessageId m)
throws DbException {
T txn = unbox(transaction);
@@ -483,6 +527,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageState(txn, m);
}
@Override
public Collection<MessageStatus> getMessageStatus(Transaction transaction,
ContactId c, GroupId g) throws DbException {
T txn = unbox(transaction);
@@ -493,6 +538,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageStatus(txn, c, g);
}
@Override
public MessageStatus getMessageStatus(Transaction transaction, ContactId c,
MessageId m) throws DbException {
T txn = unbox(transaction);
@@ -503,6 +549,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageStatus(txn, c, m);
}
@Override
public Map<MessageId, State> getMessageDependencies(Transaction transaction,
MessageId m) throws DbException {
T txn = unbox(transaction);
@@ -511,6 +558,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageDependencies(txn, m);
}
@Override
public Map<MessageId, State> getMessageDependents(Transaction transaction,
MessageId m) throws DbException {
T txn = unbox(transaction);
@@ -519,12 +567,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageDependents(txn, m);
}
@Override
public Settings getSettings(Transaction transaction, String namespace)
throws DbException {
T txn = unbox(transaction);
return db.getSettings(txn, namespace);
}
@Override
public Map<ContactId, TransportKeys> getTransportKeys(
Transaction transaction, TransportId t) throws DbException {
T txn = unbox(transaction);
@@ -533,6 +583,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getTransportKeys(txn, t);
}
@Override
public void incrementStreamCounter(Transaction transaction, ContactId c,
TransportId t, long rotationPeriod) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -544,6 +595,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.incrementStreamCounter(txn, c, t, rotationPeriod);
}
@Override
public boolean isVisibleToContact(Transaction transaction, ContactId c,
GroupId g) throws DbException {
T txn = unbox(transaction);
@@ -554,6 +606,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.containsVisibleGroup(txn, c, g);
}
@Override
public void mergeGroupMetadata(Transaction transaction, GroupId g,
Metadata meta) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -563,6 +616,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.mergeGroupMetadata(txn, g, meta);
}
@Override
public void mergeMessageMetadata(Transaction transaction, MessageId m,
Metadata meta) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -572,6 +626,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.mergeMessageMetadata(txn, m, meta);
}
@Override
public void mergeSettings(Transaction transaction, Settings s,
String namespace) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -586,6 +641,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
@Override
public void receiveAck(Transaction transaction, ContactId c, Ack a)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -602,6 +658,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new MessagesAckedEvent(c, acked));
}
@Override
public void receiveMessage(Transaction transaction, ContactId c, Message m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -620,6 +677,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
@Override
public void receiveOffer(Transaction transaction, ContactId c, Offer o)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -643,6 +701,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (request) transaction.attach(new MessageToRequestEvent(c));
}
@Override
public void receiveRequest(Transaction transaction, ContactId c, Request r)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -660,6 +719,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (requested) transaction.attach(new MessageRequestedEvent(c));
}
@Override
public void removeContact(Transaction transaction, ContactId c)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -670,6 +730,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new ContactRemovedEvent(c));
}
@Override
public void removeGroup(Transaction transaction, Group g)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -683,6 +744,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
}
@Override
public void removeLocalAuthor(Transaction transaction, AuthorId a)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -693,6 +755,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new LocalAuthorRemovedEvent(a));
}
@Override
public void removeTransport(Transaction transaction, TransportId t)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -702,6 +765,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.removeTransport(txn, t);
}
@Override
public void setContactVerified(Transaction transaction, ContactId c)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -712,6 +776,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new ContactVerifiedEvent(c));
}
@Override
public void setContactActive(Transaction transaction, ContactId c,
boolean active) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -722,16 +787,20 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new ContactStatusChangedEvent(c, active));
}
public void setMessageShared(Transaction transaction, MessageId m,
boolean shared) throws DbException {
@Override
public void setMessageShared(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
db.setMessageShared(txn, m, shared);
if (shared) transaction.attach(new MessageSharedEvent(m));
if (db.getMessageState(txn, m) != DELIVERED)
throw new IllegalArgumentException("Shared undelivered message");
db.setMessageShared(txn, m);
transaction.attach(new MessageSharedEvent(m));
}
@Override
public void setMessageState(Transaction transaction, MessageId m,
State state) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -742,6 +811,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new MessageStateChangedEvent(m, false, state));
}
@Override
public void addMessageDependencies(Transaction transaction,
Message dependent, Collection<MessageId> dependencies)
throws DbException {
@@ -755,6 +825,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
@Override
public void setReorderingWindow(Transaction transaction, ContactId c,
TransportId t, long rotationPeriod, long base, byte[] bitmap)
throws DbException {
@@ -767,6 +838,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.setReorderingWindow(txn, c, t, rotationPeriod, base, bitmap);
}
@Override
public void setVisibleToContact(Transaction transaction, ContactId c,
GroupId g, boolean visible) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
@@ -793,6 +865,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
@Override
public void updateTransportKeys(Transaction transaction,
Map<ContactId, TransportKeys> keys) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();

View File

@@ -1692,6 +1692,35 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<MessageId> getMessagesToShare(
Connection txn, ClientId c) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT m.messageId FROM messages AS m"
+ " JOIN messageDependencies AS d"
+ " ON m.messageId = d.dependencyId"
+ " JOIN messages AS m1"
+ " ON d.messageId = m1.messageId"
+ " JOIN groups AS g"
+ " ON m.groupId = g.groupId"
+ " WHERE m.shared = FALSE AND m1.shared = TRUE"
+ " AND g.clientId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, c.getBytes());
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
rs.close();
ps.close();
return Collections.unmodifiableList(ids);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
@Nullable
public byte[] getRawMessage(Connection txn, MessageId m)
throws DbException {
@@ -2321,14 +2350,13 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setMessageShared(Connection txn, MessageId m, boolean shared)
throws DbException {
public void setMessageShared(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE messages SET shared = ? WHERE messageId = ?";
String sql = "UPDATE messages SET shared = TRUE"
+ " WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBoolean(1, shared);
ps.setBytes(2, m.getBytes());
ps.setBytes(1, m.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();

View File

@@ -73,15 +73,16 @@ class ForumManagerImpl extends BdfIncomingMessageHook implements ForumManager {
}
@Override
protected void incomingMessage(Transaction txn, Message m, BdfList body,
protected boolean incomingMessage(Transaction txn, Message m, BdfList body,
BdfDictionary meta) throws DbException, FormatException {
clientHelper.setMessageShared(txn, m.getId(), true);
ForumPostHeader post = getForumPostHeader(txn, m.getId(), meta);
ForumPostReceivedEvent event =
new ForumPostReceivedEvent(post, m.getGroupId());
txn.attach(event);
// share message
return true;
}
@Override

View File

@@ -207,7 +207,7 @@ class IntroductionManagerImpl extends BdfIncomingMessageHook
* in the introduction protocol and which engine we need to start.
*/
@Override
protected void incomingMessage(Transaction txn, Message m, BdfList body,
protected boolean incomingMessage(Transaction txn, Message m, BdfList body,
BdfDictionary message) throws DbException {
// Get message data and type
@@ -233,7 +233,7 @@ class IntroductionManagerImpl extends BdfIncomingMessageHook
LOG.log(WARNING, e.toString(), e);
}
deleteMessage(txn, m.getId());
return;
return false;
}
try {
introduceeManager.incomingMessage(txn, state, message);
@@ -254,7 +254,7 @@ class IntroductionManagerImpl extends BdfIncomingMessageHook
} catch (FormatException e) {
LOG.warning("Could not find state for message, deleting...");
deleteMessage(txn, m.getId());
return;
return false;
}
long role = state.getLong(ROLE, -1L);
@@ -285,6 +285,7 @@ class IntroductionManagerImpl extends BdfIncomingMessageHook
LOG.warning("Unknown message type '" + type + "', deleting...");
}
}
return false;
}
@Override

View File

@@ -93,7 +93,7 @@ class MessagingManagerImpl extends BdfIncomingMessageHook
}
@Override
protected void incomingMessage(Transaction txn, Message m, BdfList body,
protected boolean incomingMessage(Transaction txn, Message m, BdfList body,
BdfDictionary meta) throws DbException, FormatException {
GroupId groupId = m.getGroupId();
@@ -106,6 +106,9 @@ class MessagingManagerImpl extends BdfIncomingMessageHook
PrivateMessageReceivedEvent event = new PrivateMessageReceivedEvent(
header, groupId);
txn.attach(event);
// don't share message
return false;
}
@Override

View File

@@ -192,7 +192,7 @@ abstract class SharingManagerImpl<S extends Shareable, I extends Invitation, IS
}
@Override
protected void incomingMessage(Transaction txn, Message m, BdfList body,
protected boolean incomingMessage(Transaction txn, Message m, BdfList body,
BdfDictionary d) throws DbException, FormatException {
BaseMessage msg = BaseMessage.from(getIFactory(), m.getGroupId(), d);
@@ -263,6 +263,8 @@ abstract class SharingManagerImpl<S extends Shareable, I extends Invitation, IS
// message has passed validator, so that should never happen
throw new RuntimeException("Illegal Sharing Message");
}
// don't share message as other party already has it
return false;
}
@Override

View File

@@ -72,6 +72,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
for (ClientId c : validators.keySet()) {
validateOutstandingMessagesAsync(c);
deliverOutstandingMessagesAsync(c);
shareOutstandingMessagesAsync(c);
}
}
@@ -191,6 +192,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
private void deliverNextPendingMessage(Queue<MessageId> pending) {
try {
boolean anyInvalid = false, allDelivered = true;
Queue<MessageId> toShare = null;
Queue<MessageId> invalidate = null;
Transaction txn = db.startTransaction(false);
try {
@@ -213,8 +215,14 @@ class ValidationManagerImpl implements ValidationManager, Service,
ClientId c = g.getClientId();
Metadata meta = db.getMessageMetadataForValidator(txn,
id);
if (deliverMessage(txn, m, c, meta)) {
DeliveryResult result = deliverMessage(txn, m, c, meta);
if (result.valid) {
pending.addAll(getPendingDependents(txn, id));
if (result.share) {
db.setMessageShared(txn, id);
toShare = new LinkedList<MessageId>(
states.keySet());
}
} else {
invalidate = getDependentsToInvalidate(txn, id);
}
@@ -226,6 +234,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
if (invalidate != null) invalidateNextMessageAsync(invalidate);
deliverNextPendingMessageAsync(pending);
if (toShare != null) shareNextMessageAsync(toShare);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before delivery");
deliverNextPendingMessageAsync(pending);
@@ -284,16 +293,17 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
private void storeMessageContext(Message m, ClientId c,
MessageContext result) {
MessageContext context) {
try {
MessageId id = m.getId();
boolean anyInvalid = false, allDelivered = true;
Queue<MessageId> invalidate = null;
Queue<MessageId> pending = null;
Queue<MessageId> toShare = null;
Transaction txn = db.startTransaction(false);
try {
// Check if message has any dependencies
Collection<MessageId> dependencies = result.getDependencies();
Collection<MessageId> dependencies = context.getDependencies();
if (!dependencies.isEmpty()) {
db.addMessageDependencies(txn, m, dependencies);
// Check if dependencies are valid and delivered
@@ -310,11 +320,17 @@ class ValidationManagerImpl implements ValidationManager, Service,
invalidate = getDependentsToInvalidate(txn, id);
}
} else {
Metadata meta = result.getMetadata();
Metadata meta = context.getMetadata();
db.mergeMessageMetadata(txn, id, meta);
if (allDelivered) {
if (deliverMessage(txn, m, c, meta)) {
DeliveryResult result = deliverMessage(txn, m, c, meta);
if (result.valid) {
pending = getPendingDependents(txn, id);
if (result.share) {
db.setMessageShared(txn, id);
toShare =
new LinkedList<MessageId>(dependencies);
}
} else {
invalidate = getDependentsToInvalidate(txn, id);
}
@@ -328,6 +344,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
if (invalidate != null) invalidateNextMessageAsync(invalidate);
if (pending != null) deliverNextPendingMessageAsync(pending);
if (toShare != null) shareNextMessageAsync(toShare);
} catch (NoSuchMessageException e) {
LOG.info("Message removed during validation");
} catch (NoSuchGroupException e) {
@@ -337,18 +354,21 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
}
private boolean deliverMessage(Transaction txn, Message m, ClientId c,
Metadata meta) throws DbException {
private DeliveryResult deliverMessage(Transaction txn, Message m,
ClientId c, Metadata meta) throws DbException {
// Deliver the message to the client if it's registered a hook
boolean shareMsg = false;
IncomingMessageHook hook = hooks.get(c);
if (hook != null) hook.incomingMessage(txn, m, meta);
if (hook != null) {
shareMsg = hook.incomingMessage(txn, m, meta);
}
// TODO: Find a better way for clients to signal validity, #643
if (db.getRawMessage(txn, m.getId()) == null) {
db.setMessageState(txn, m.getId(), INVALID);
return false;
return new DeliveryResult(false, false);
} else {
db.setMessageState(txn, m.getId(), DELIVERED);
return true;
return new DeliveryResult(true, shareMsg);
}
}
@@ -362,6 +382,70 @@ class ValidationManagerImpl implements ValidationManager, Service,
return pending;
}
private void shareOutstandingMessagesAsync(final ClientId c) {
dbExecutor.execute(new Runnable() {
@Override
public void run() {
shareOutstandingMessages(c);
}
});
}
private void shareOutstandingMessages(ClientId c) {
try {
Queue<MessageId> toShare = new LinkedList<MessageId>();
Transaction txn = db.startTransaction(true);
try {
toShare.addAll(db.getMessagesToShare(txn, c));
txn.setComplete();
} finally {
db.endTransaction(txn);
}
shareNextMessageAsync(toShare);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
/**
* Shares the next message from the toShare queue asynchronously.
*
* This method should only be called for messages that have all their
* dependencies delivered and have been delivered themselves.
*/
private void shareNextMessageAsync(final Queue<MessageId> toShare) {
if (toShare.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@Override
public void run() {
shareNextMessage(toShare);
}
});
}
private void shareNextMessage(Queue<MessageId> toShare) {
try {
Transaction txn = db.startTransaction(false);
try {
MessageId id = toShare.poll();
db.setMessageShared(txn, id);
toShare.addAll(db.getMessageDependencies(txn, id).keySet());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
shareNextMessageAsync(toShare);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before sharing");
shareNextMessageAsync(toShare);
} catch (NoSuchGroupException e) {
LOG.info("Group removed before sharing");
shareNextMessageAsync(toShare);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void invalidateNextMessageAsync(final Queue<MessageId> invalidate) {
if (invalidate.isEmpty()) return;
dbExecutor.execute(new Runnable() {
@@ -447,4 +531,13 @@ class ValidationManagerImpl implements ValidationManager, Service,
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private static class DeliveryResult {
private final boolean valid, share;
private DeliveryResult(boolean valid, boolean share) {
this.valid = valid;
this.share = share;
}
}
}

View File

@@ -199,7 +199,6 @@ public class BlogManagerImplTest extends BriarTestCase {
);
context.checking(new Expectations() {{
oneOf(clientHelper).setMessageShared(txn, messageId, true);
oneOf(identityManager)
.getAuthorStatus(txn, blog1.getAuthor().getId());
will(returnValue(VERIFIED));

View File

@@ -760,7 +760,7 @@ public class DatabaseComponentImplTest extends BriarTestCase {
transaction = db.startTransaction(false);
try {
db.setMessageShared(transaction, message.getId(), true);
db.setMessageShared(transaction, message.getId());
fail();
} catch (NoSuchMessageException expected) {
// Expected

View File

@@ -272,19 +272,12 @@ public class H2DatabaseTest extends BriarTestCase {
assertTrue(ids.isEmpty());
// Sharing the message should make it sendable
db.setMessageShared(txn, messageId, true);
db.setMessageShared(txn, messageId);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE);
assertEquals(Collections.singletonList(messageId), ids);
ids = db.getMessagesToOffer(txn, contactId, 100);
assertEquals(Collections.singletonList(messageId), ids);
// Unsharing the message should make it unsendable
db.setMessageShared(txn, messageId, false);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE);
assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100);
assertTrue(ids.isEmpty());
db.commitTransaction(txn);
db.close();
}
@@ -1363,6 +1356,43 @@ public class H2DatabaseTest extends BriarTestCase {
db.close();
}
@Test
public void testGetMessagesToShare() throws Exception {
MessageId mId1 = new MessageId(TestUtils.getRandomId());
MessageId mId2 = new MessageId(TestUtils.getRandomId());
MessageId mId3 = new MessageId(TestUtils.getRandomId());
MessageId mId4 = new MessageId(TestUtils.getRandomId());
Message m1 = new Message(mId1, groupId, timestamp, raw);
Message m2 = new Message(mId2, groupId, timestamp, raw);
Message m3 = new Message(mId3, groupId, timestamp, raw);
Message m4 = new Message(mId4, groupId, timestamp, raw);
Database<Connection> db = open(false);
Connection txn = db.startTransaction();
// Add a group and some messages
db.addGroup(txn, group);
db.addMessage(txn, m1, DELIVERED, true);
db.addMessage(txn, m2, DELIVERED, false);
db.addMessage(txn, m3, DELIVERED, false);
db.addMessage(txn, m4, DELIVERED, true);
// Introduce dependencies between the messages
db.addMessageDependency(txn, groupId, mId1, mId2);
db.addMessageDependency(txn, groupId, mId3, mId1);
db.addMessageDependency(txn, groupId, mId4, mId3);
// Retrieve messages to be shared
Collection<MessageId> result =
db.getMessagesToShare(txn, clientId);
assertEquals(2, result.size());
assertTrue(result.contains(mId2));
assertTrue(result.contains(mId3));
db.commitTransaction(txn);
db.close();
}
@Test
public void testGetMessageStatus() throws Exception {
Database<Connection> db = open(false);

View File

@@ -84,6 +84,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
final Transaction txn3 = new Transaction(null, true);
final Transaction txn4 = new Transaction(null, false);
final Transaction txn5 = new Transaction(null, true);
final Transaction txn6 = new Transaction(null, true);
context.checking(new Expectations() {{
// Get messages to validate
oneOf(db).startTransaction(true);
@@ -108,6 +109,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
oneOf(db).mergeMessageMetadata(txn2, messageId, metadata);
// Deliver the first message
oneOf(hook).incomingMessage(txn2, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn2, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn2, messageId, DELIVERED);
@@ -144,6 +146,12 @@ public class ValidationManagerImplTest extends BriarTestCase {
oneOf(db).getPendingMessages(txn5, clientId);
will(returnValue(Collections.emptyList()));
oneOf(db).endTransaction(txn5);
// Get messages to share
oneOf(db).startTransaction(true);
will(returnValue(txn6));
oneOf(db).getMessagesToShare(txn6, clientId);
will(returnValue(Collections.emptyList()));
oneOf(db).endTransaction(txn6);
}});
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
@@ -160,6 +168,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
assertTrue(txn3.isComplete());
assertTrue(txn4.isComplete());
assertTrue(txn5.isComplete());
assertTrue(txn6.isComplete());
}
@Test
@@ -175,6 +184,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
final Transaction txn1 = new Transaction(null, true);
final Transaction txn2 = new Transaction(null, false);
final Transaction txn3 = new Transaction(null, false);
final Transaction txn4 = new Transaction(null, true);
context.checking(new Expectations() {{
// Get messages to validate
@@ -205,6 +215,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
will(returnValue(new Metadata()));
// Deliver the message
oneOf(hook).incomingMessage(txn2, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn2, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn2, messageId, DELIVERED);
@@ -228,6 +239,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
will(returnValue(metadata));
// Deliver the dependent
oneOf(hook).incomingMessage(txn3, message2, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn3, messageId2);
will(returnValue(raw));
oneOf(db).setMessageState(txn3, messageId2, DELIVERED);
@@ -235,6 +247,13 @@ public class ValidationManagerImplTest extends BriarTestCase {
oneOf(db).getMessageDependents(txn3, messageId2);
will(returnValue(Collections.emptyMap()));
oneOf(db).endTransaction(txn3);
// Get messages to share
oneOf(db).startTransaction(true);
will(returnValue(txn4));
oneOf(db).getMessagesToShare(txn4, clientId);
will(returnValue(Collections.emptyList()));
oneOf(db).endTransaction(txn4);
}});
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
@@ -249,6 +268,137 @@ public class ValidationManagerImplTest extends BriarTestCase {
assertTrue(txn1.isComplete());
assertTrue(txn2.isComplete());
assertTrue(txn3.isComplete());
assertTrue(txn4.isComplete());
}
@Test
public void testMessagesAreSharedAtStartup() throws Exception {
Mockery context = new Mockery();
final DatabaseComponent db = context.mock(DatabaseComponent.class);
final Executor dbExecutor = new ImmediateExecutor();
final Executor cryptoExecutor = new ImmediateExecutor();
final MessageValidator validator = context.mock(MessageValidator.class);
final IncomingMessageHook hook =
context.mock(IncomingMessageHook.class);
final Transaction txn = new Transaction(null, true);
final Transaction txn1 = new Transaction(null, true);
final Transaction txn2 = new Transaction(null, true);
final Transaction txn3 = new Transaction(null, false);
final Transaction txn4 = new Transaction(null, false);
context.checking(new Expectations() {{
// No messages to validate
oneOf(db).startTransaction(true);
will(returnValue(txn));
oneOf(db).getMessagesToValidate(txn, clientId);
will(returnValue(Collections.emptyList()));
oneOf(db).endTransaction(txn);
// No pending messages to deliver
oneOf(db).startTransaction(true);
will(returnValue(txn1));
oneOf(db).getPendingMessages(txn1, clientId);
will(returnValue(Collections.emptyList()));
oneOf(db).endTransaction(txn1);
// Get messages to share
oneOf(db).startTransaction(true);
will(returnValue(txn2));
oneOf(db).getMessagesToShare(txn2, clientId);
will(returnValue(Collections.singletonList(messageId)));
oneOf(db).endTransaction(txn2);
// Share message and get dependencies
oneOf(db).startTransaction(false);
will(returnValue(txn3));
oneOf(db).setMessageShared(txn3, messageId);
oneOf(db).getMessageDependencies(txn3, messageId);
will(returnValue(Collections.singletonMap(messageId2, DELIVERED)));
oneOf(db).endTransaction(txn3);
// Share dependency
oneOf(db).startTransaction(false);
will(returnValue(txn4));
oneOf(db).setMessageShared(txn4, messageId2);
oneOf(db).getMessageDependencies(txn4, messageId2);
will(returnValue(Collections.emptyMap()));
oneOf(db).endTransaction(txn4);
}});
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
cryptoExecutor);
vm.registerMessageValidator(clientId, validator);
vm.registerIncomingMessageHook(clientId, hook);
vm.startService();
context.assertIsSatisfied();
assertTrue(txn.isComplete());
assertTrue(txn1.isComplete());
assertTrue(txn2.isComplete());
assertTrue(txn3.isComplete());
assertTrue(txn4.isComplete());
}
@Test
public void testIncomingMessagesAreShared() throws Exception {
Mockery context = new Mockery();
final DatabaseComponent db = context.mock(DatabaseComponent.class);
final Executor dbExecutor = new ImmediateExecutor();
final Executor cryptoExecutor = new ImmediateExecutor();
final MessageValidator validator = context.mock(MessageValidator.class);
final IncomingMessageHook hook =
context.mock(IncomingMessageHook.class);
final Transaction txn = new Transaction(null, true);
final Transaction txn1 = new Transaction(null, false);
final Transaction txn2 = new Transaction(null, false);
context.checking(new Expectations() {{
// Load the group
oneOf(db).startTransaction(true);
will(returnValue(txn));
oneOf(db).getGroup(txn, groupId);
will(returnValue(group));
oneOf(db).endTransaction(txn);
// Validate the message: valid
oneOf(validator).validateMessage(message, group);
will(returnValue(validResultWithDependencies));
// Store the validation result
oneOf(db).startTransaction(false);
will(returnValue(txn1));
oneOf(db).addMessageDependencies(txn1, message,
validResultWithDependencies.getDependencies());
oneOf(db).getMessageDependencies(txn1, messageId);
will(returnValue(Collections.singletonMap(messageId1, DELIVERED)));
oneOf(db).mergeMessageMetadata(txn1, messageId, metadata);
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(true));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);
// Get any pending dependents
oneOf(db).getMessageDependents(txn1, messageId);
will(returnValue(Collections.emptyMap()));
// Share message
oneOf(db).setMessageShared(txn1, messageId);
oneOf(db).endTransaction(txn1);
// Share dependencies
oneOf(db).startTransaction(false);
will(returnValue(txn2));
oneOf(db).setMessageShared(txn2, messageId1);
oneOf(db).getMessageDependencies(txn2, messageId1);
will(returnValue(Collections.emptyMap()));
oneOf(db).endTransaction(txn2);
}});
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
cryptoExecutor);
vm.registerMessageValidator(clientId, validator);
vm.registerIncomingMessageHook(clientId, hook);
vm.eventOccurred(new MessageAddedEvent(message, contactId));
context.assertIsSatisfied();
assertTrue(txn.isComplete());
assertTrue(txn1.isComplete());
assertTrue(txn2.isComplete());
}
@Test
@@ -266,6 +416,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
final Transaction txn2 = new Transaction(null, true);
final Transaction txn3 = new Transaction(null, false);
final Transaction txn4 = new Transaction(null, true);
final Transaction txn5 = new Transaction(null, true);
context.checking(new Expectations() {{
// Get messages to validate
oneOf(db).startTransaction(true);
@@ -308,6 +459,12 @@ public class ValidationManagerImplTest extends BriarTestCase {
oneOf(db).getPendingMessages(txn4, clientId);
will(returnValue(Collections.emptyList()));
oneOf(db).endTransaction(txn4);
// Get messages to share
oneOf(db).startTransaction(true);
will(returnValue(txn5));
oneOf(db).getMessagesToShare(txn5, clientId);
will(returnValue(Collections.emptyList()));
oneOf(db).endTransaction(txn5);
}});
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
@@ -323,6 +480,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
assertTrue(txn2.isComplete());
assertTrue(txn3.isComplete());
assertTrue(txn4.isComplete());
assertTrue(txn5.isComplete());
}
@Test
@@ -340,6 +498,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
final Transaction txn2 = new Transaction(null, true);
final Transaction txn3 = new Transaction(null, false);
final Transaction txn4 = new Transaction(null, true);
final Transaction txn5 = new Transaction(null, true);
context.checking(new Expectations() {{
// Get messages to validate
oneOf(db).startTransaction(true);
@@ -385,6 +544,12 @@ public class ValidationManagerImplTest extends BriarTestCase {
oneOf(db).getPendingMessages(txn4, clientId);
will(returnValue(Collections.emptyList()));
oneOf(db).endTransaction(txn4);
// Get messages to share
oneOf(db).startTransaction(true);
will(returnValue(txn5));
oneOf(db).getMessagesToShare(txn5, clientId);
will(returnValue(Collections.emptyList()));
oneOf(db).endTransaction(txn5);
}});
ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor,
@@ -400,6 +565,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
assertTrue(txn2.isComplete());
assertTrue(txn3.isComplete());
assertTrue(txn4.isComplete());
assertTrue(txn5.isComplete());
}
@Test
@@ -429,6 +595,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
oneOf(db).mergeMessageMetadata(txn1, messageId, metadata);
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);
@@ -548,6 +715,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
oneOf(db).mergeMessageMetadata(txn1, messageId, metadata);
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);
@@ -792,6 +960,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
oneOf(db).mergeMessageMetadata(txn1, messageId, metadata);
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);
@@ -815,6 +984,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
will(returnValue(metadata));
// Deliver message 1
oneOf(hook).incomingMessage(txn2, message1, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn2, messageId1);
will(returnValue(raw));
oneOf(db).setMessageState(txn2, messageId1, DELIVERED);
@@ -838,6 +1008,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
will(returnValue(metadata));
// Deliver message 2
oneOf(hook).incomingMessage(txn3, message2, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn3, messageId2);
will(returnValue(raw));
oneOf(db).setMessageState(txn3, messageId2, DELIVERED);
@@ -890,6 +1061,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
will(returnValue(metadata));
// Deliver message 4
oneOf(hook).incomingMessage(txn6, message4, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn6, messageId4);
will(returnValue(raw));
oneOf(db).setMessageState(txn6, messageId4, DELIVERED);
@@ -947,6 +1119,7 @@ public class ValidationManagerImplTest extends BriarTestCase {
oneOf(db).mergeMessageMetadata(txn1, messageId, metadata);
// Deliver the message
oneOf(hook).incomingMessage(txn1, message, metadata);
will(returnValue(false));
oneOf(db).getRawMessage(txn1, messageId);
will(returnValue(raw));
oneOf(db).setMessageState(txn1, messageId, DELIVERED);