mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-11 18:29:05 +01:00
Remove broken and deprecated MessageQueue as it is not needed anymore
Closes #308
This commit is contained in:
@@ -1,72 +0,0 @@
|
||||
package org.briarproject.briar.api.client;
|
||||
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.db.Metadata;
|
||||
import org.briarproject.bramble.api.db.Transaction;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.ClientId;
|
||||
import org.briarproject.bramble.api.sync.Group;
|
||||
import org.briarproject.bramble.api.sync.InvalidMessageException;
|
||||
import org.briarproject.bramble.api.sync.MessageContext;
|
||||
|
||||
@Deprecated
|
||||
@NotNullByDefault
|
||||
public interface MessageQueueManager {
|
||||
|
||||
/**
|
||||
* The key used for storing the queue's state in the group metadata.
|
||||
*/
|
||||
String QUEUE_STATE_KEY = "queueState";
|
||||
|
||||
/**
|
||||
* Sends a message using the given queue.
|
||||
*/
|
||||
QueueMessage sendMessage(Transaction txn, Group queue, long timestamp,
|
||||
byte[] body, Metadata meta) throws DbException;
|
||||
|
||||
/**
|
||||
* Sets the message validator for the given client.
|
||||
*/
|
||||
void registerMessageValidator(ClientId c, QueueMessageValidator v);
|
||||
|
||||
/**
|
||||
* Sets the incoming message hook for the given client. The hook will be
|
||||
* called once for each incoming message that passes validation. Messages
|
||||
* are passed to the hook in order.
|
||||
*/
|
||||
void registerIncomingMessageHook(ClientId c, IncomingQueueMessageHook hook);
|
||||
|
||||
@Deprecated
|
||||
interface QueueMessageValidator {
|
||||
|
||||
/**
|
||||
* Validates the given message and returns its metadata and
|
||||
* dependencies.
|
||||
*/
|
||||
MessageContext validateMessage(QueueMessage q, Group g)
|
||||
throws InvalidMessageException;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
interface IncomingQueueMessageHook {
|
||||
|
||||
/**
|
||||
* Called once for each incoming message that passes validation.
|
||||
* Messages are passed to the hook in order.
|
||||
*
|
||||
* @throws DbException Should only be used for real database errors.
|
||||
* If this is thrown, delivery will be attempted again at next startup,
|
||||
* whereas if an InvalidMessageException is thrown,
|
||||
* the message will be permanently invalidated.
|
||||
* @throws InvalidMessageException for any non-database error
|
||||
* that occurs while handling remotely created data.
|
||||
* This includes errors that occur while handling locally created data
|
||||
* in a context controlled by remotely created data
|
||||
* (for example, parsing the metadata of a dependency
|
||||
* of an incoming message).
|
||||
* Never rethrow DbException as InvalidMessageException!
|
||||
*/
|
||||
void incomingMessage(Transaction txn, QueueMessage q, Metadata meta)
|
||||
throws DbException, InvalidMessageException;
|
||||
}
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
package org.briarproject.briar.api.client;
|
||||
|
||||
import org.briarproject.bramble.api.event.Event;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Deprecated
|
||||
@NotNullByDefault
|
||||
public interface ProtocolEngine<A, S, M> {
|
||||
|
||||
StateUpdate<S, M> onLocalAction(S localState, A action);
|
||||
|
||||
StateUpdate<S, M> onMessageReceived(S localState, M received);
|
||||
|
||||
StateUpdate<S, M> onMessageDelivered(S localState, M delivered);
|
||||
|
||||
class StateUpdate<S, M> {
|
||||
public final boolean deleteMessage;
|
||||
public final boolean deleteState;
|
||||
public final S localState;
|
||||
public final List<M> toSend;
|
||||
public final List<Event> toBroadcast;
|
||||
|
||||
/**
|
||||
* This class represents an update of the local protocol state.
|
||||
* It only shows how the state should be updated,
|
||||
* but does not carry out the updates on its own.
|
||||
*
|
||||
* @param deleteMessage whether to delete the message that triggered
|
||||
* the state update. This will be ignored for
|
||||
* {@link ProtocolEngine#onLocalAction}.
|
||||
* @param deleteState whether to delete the localState {@link S}
|
||||
* @param localState the new local state
|
||||
* @param toSend a list of messages to be sent as part of the
|
||||
* state update
|
||||
* @param toBroadcast a list of events to broadcast as result of the
|
||||
* state update
|
||||
*/
|
||||
public StateUpdate(boolean deleteMessage, boolean deleteState,
|
||||
S localState, List<M> toSend, List<Event> toBroadcast) {
|
||||
|
||||
this.deleteMessage = deleteMessage;
|
||||
this.deleteState = deleteState;
|
||||
this.localState = localState;
|
||||
this.toSend = toSend;
|
||||
this.toBroadcast = toBroadcast;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
package org.briarproject.briar.api.client;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
|
||||
@Deprecated
|
||||
@NotNullByDefault
|
||||
public class QueueMessage extends Message {
|
||||
|
||||
public static final int QUEUE_MESSAGE_HEADER_LENGTH =
|
||||
MESSAGE_HEADER_LENGTH + 8;
|
||||
public static final int MAX_QUEUE_MESSAGE_BODY_LENGTH =
|
||||
MAX_MESSAGE_BODY_LENGTH - 8;
|
||||
|
||||
private final long queuePosition;
|
||||
|
||||
public QueueMessage(MessageId id, GroupId groupId, long timestamp,
|
||||
long queuePosition, byte[] raw) {
|
||||
super(id, groupId, timestamp, raw);
|
||||
this.queuePosition = queuePosition;
|
||||
}
|
||||
|
||||
public long getQueuePosition() {
|
||||
return queuePosition;
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package org.briarproject.briar.api.client;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
|
||||
@Deprecated
|
||||
@NotNullByDefault
|
||||
public interface QueueMessageFactory {
|
||||
|
||||
QueueMessage createMessage(GroupId groupId, long timestamp,
|
||||
long queuePosition, byte[] body);
|
||||
|
||||
QueueMessage createMessage(MessageId id, byte[] raw);
|
||||
}
|
||||
@@ -13,18 +13,14 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.InvalidMessageException;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.ValidationManager.IncomingMessageHook;
|
||||
import org.briarproject.briar.api.client.MessageQueueManager.IncomingQueueMessageHook;
|
||||
import org.briarproject.briar.api.client.QueueMessage;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH;
|
||||
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
|
||||
IncomingQueueMessageHook {
|
||||
public abstract class BdfIncomingMessageHook implements IncomingMessageHook {
|
||||
|
||||
protected final DatabaseComponent db;
|
||||
protected final ClientHelper clientHelper;
|
||||
@@ -67,16 +63,6 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incomingMessage(Transaction txn, QueueMessage q, Metadata meta)
|
||||
throws DbException, InvalidMessageException {
|
||||
try {
|
||||
incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH);
|
||||
} catch (FormatException e) {
|
||||
throw new InvalidMessageException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean incomingMessage(Transaction txn, Message m, Metadata meta,
|
||||
int headerLength) throws DbException, FormatException {
|
||||
byte[] raw = m.getRaw();
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
package org.briarproject.briar.client;
|
||||
|
||||
import org.briarproject.bramble.api.FormatException;
|
||||
import org.briarproject.bramble.api.client.BdfMessageContext;
|
||||
import org.briarproject.bramble.api.client.ClientHelper;
|
||||
import org.briarproject.bramble.api.data.BdfList;
|
||||
import org.briarproject.bramble.api.data.MetadataEncoder;
|
||||
import org.briarproject.bramble.api.db.Metadata;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.Group;
|
||||
import org.briarproject.bramble.api.sync.InvalidMessageException;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageContext;
|
||||
import org.briarproject.bramble.api.system.Clock;
|
||||
import org.briarproject.briar.api.client.MessageQueueManager.QueueMessageValidator;
|
||||
import org.briarproject.briar.api.client.QueueMessage;
|
||||
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
import static org.briarproject.bramble.api.transport.TransportConstants.MAX_CLOCK_DIFFERENCE;
|
||||
import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH;
|
||||
|
||||
@Deprecated
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public abstract class BdfQueueMessageValidator
|
||||
implements QueueMessageValidator {
|
||||
|
||||
protected static final Logger LOG =
|
||||
Logger.getLogger(BdfQueueMessageValidator.class.getName());
|
||||
|
||||
protected final ClientHelper clientHelper;
|
||||
protected final MetadataEncoder metadataEncoder;
|
||||
protected final Clock clock;
|
||||
|
||||
protected BdfQueueMessageValidator(ClientHelper clientHelper,
|
||||
MetadataEncoder metadataEncoder, Clock clock) {
|
||||
this.clientHelper = clientHelper;
|
||||
this.metadataEncoder = metadataEncoder;
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
protected abstract BdfMessageContext validateMessage(Message m, Group g,
|
||||
BdfList body) throws InvalidMessageException, FormatException;
|
||||
|
||||
@Override
|
||||
public MessageContext validateMessage(QueueMessage q, Group g)
|
||||
throws InvalidMessageException {
|
||||
// Reject the message if it's too far in the future
|
||||
long now = clock.currentTimeMillis();
|
||||
if (q.getTimestamp() - now > MAX_CLOCK_DIFFERENCE) {
|
||||
throw new InvalidMessageException(
|
||||
"Timestamp is too far in the future");
|
||||
}
|
||||
byte[] raw = q.getRaw();
|
||||
if (raw.length <= QUEUE_MESSAGE_HEADER_LENGTH) {
|
||||
throw new InvalidMessageException("Message is too short");
|
||||
}
|
||||
try {
|
||||
BdfList body = clientHelper.toList(raw, QUEUE_MESSAGE_HEADER_LENGTH,
|
||||
raw.length - QUEUE_MESSAGE_HEADER_LENGTH);
|
||||
BdfMessageContext result = validateMessage(q, g, body);
|
||||
Metadata meta = metadataEncoder.encode(result.getDictionary());
|
||||
return new MessageContext(meta, result.getDependencies());
|
||||
} catch (FormatException e) {
|
||||
throw new InvalidMessageException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,14 +1,6 @@
|
||||
package org.briarproject.briar.client;
|
||||
|
||||
import org.briarproject.bramble.api.client.ClientHelper;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.sync.MessageFactory;
|
||||
import org.briarproject.bramble.api.sync.ValidationManager;
|
||||
import org.briarproject.briar.api.client.MessageQueueManager;
|
||||
import org.briarproject.briar.api.client.MessageTracker;
|
||||
import org.briarproject.briar.api.client.QueueMessageFactory;
|
||||
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import dagger.Module;
|
||||
import dagger.Provides;
|
||||
@@ -16,21 +8,6 @@ import dagger.Provides;
|
||||
@Module
|
||||
public class BriarClientModule {
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
MessageQueueManager provideMessageQueueManager(DatabaseComponent db,
|
||||
ClientHelper clientHelper, QueueMessageFactory queueMessageFactory,
|
||||
ValidationManager validationManager) {
|
||||
return new MessageQueueManagerImpl(db, clientHelper,
|
||||
queueMessageFactory, validationManager);
|
||||
}
|
||||
|
||||
@Provides
|
||||
QueueMessageFactory provideQueueMessageFactory(
|
||||
MessageFactory messageFactory) {
|
||||
return new QueueMessageFactoryImpl(messageFactory);
|
||||
}
|
||||
|
||||
@Provides
|
||||
MessageTracker provideMessageTracker(MessageTrackerImpl messageTracker) {
|
||||
return messageTracker;
|
||||
|
||||
@@ -1,259 +0,0 @@
|
||||
package org.briarproject.briar.client;
|
||||
|
||||
import org.briarproject.bramble.api.FormatException;
|
||||
import org.briarproject.bramble.api.client.ClientHelper;
|
||||
import org.briarproject.bramble.api.data.BdfDictionary;
|
||||
import org.briarproject.bramble.api.data.BdfList;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.db.Metadata;
|
||||
import org.briarproject.bramble.api.db.Transaction;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.ClientId;
|
||||
import org.briarproject.bramble.api.sync.Group;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.InvalidMessageException;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageContext;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.ValidationManager;
|
||||
import org.briarproject.bramble.api.sync.ValidationManager.IncomingMessageHook;
|
||||
import org.briarproject.bramble.api.sync.ValidationManager.MessageValidator;
|
||||
import org.briarproject.bramble.util.ByteUtils;
|
||||
import org.briarproject.briar.api.client.MessageQueueManager;
|
||||
import org.briarproject.briar.api.client.QueueMessage;
|
||||
import org.briarproject.briar.api.client.QueueMessageFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.TreeMap;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH;
|
||||
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
class MessageQueueManagerImpl implements MessageQueueManager {
|
||||
|
||||
private static final String OUTGOING_POSITION_KEY = "nextOut";
|
||||
private static final String INCOMING_POSITION_KEY = "nextIn";
|
||||
private static final String PENDING_MESSAGES_KEY = "pending";
|
||||
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(MessageQueueManagerImpl.class.getName());
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final ClientHelper clientHelper;
|
||||
private final QueueMessageFactory queueMessageFactory;
|
||||
private final ValidationManager validationManager;
|
||||
|
||||
@Inject
|
||||
MessageQueueManagerImpl(DatabaseComponent db, ClientHelper clientHelper,
|
||||
QueueMessageFactory queueMessageFactory,
|
||||
ValidationManager validationManager) {
|
||||
this.db = db;
|
||||
this.clientHelper = clientHelper;
|
||||
this.queueMessageFactory = queueMessageFactory;
|
||||
this.validationManager = validationManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueMessage sendMessage(Transaction txn, Group queue,
|
||||
long timestamp, byte[] body, Metadata meta) throws DbException {
|
||||
QueueState queueState = loadQueueState(txn, queue.getId());
|
||||
long queuePosition = queueState.outgoingPosition;
|
||||
queueState.outgoingPosition++;
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info("Sending message with position " + queuePosition);
|
||||
saveQueueState(txn, queue.getId(), queueState);
|
||||
QueueMessage q = queueMessageFactory.createMessage(queue.getId(),
|
||||
timestamp, queuePosition, body);
|
||||
db.addLocalMessage(txn, q, meta, true);
|
||||
return q;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerMessageValidator(ClientId c, QueueMessageValidator v) {
|
||||
validationManager.registerMessageValidator(c,
|
||||
new DelegatingMessageValidator(v));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerIncomingMessageHook(ClientId c,
|
||||
IncomingQueueMessageHook hook) {
|
||||
validationManager.registerIncomingMessageHook(c,
|
||||
new DelegatingIncomingMessageHook(hook));
|
||||
}
|
||||
|
||||
private QueueState loadQueueState(Transaction txn, GroupId g)
|
||||
throws DbException {
|
||||
try {
|
||||
TreeMap<Long, MessageId> pending = new TreeMap<>();
|
||||
Metadata groupMeta = db.getGroupMetadata(txn, g);
|
||||
byte[] raw = groupMeta.get(QUEUE_STATE_KEY);
|
||||
if (raw == null) return new QueueState(0, 0, pending);
|
||||
BdfDictionary d = clientHelper.toDictionary(raw, 0, raw.length);
|
||||
long outgoingPosition = d.getLong(OUTGOING_POSITION_KEY);
|
||||
long incomingPosition = d.getLong(INCOMING_POSITION_KEY);
|
||||
BdfList pendingList = d.getList(PENDING_MESSAGES_KEY);
|
||||
for (int i = 0; i < pendingList.size(); i++) {
|
||||
BdfList item = pendingList.getList(i);
|
||||
if (item.size() != 2) throw new FormatException();
|
||||
pending.put(item.getLong(0), new MessageId(item.getRaw(1)));
|
||||
}
|
||||
return new QueueState(outgoingPosition, incomingPosition, pending);
|
||||
} catch (FormatException e) {
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void saveQueueState(Transaction txn, GroupId g,
|
||||
QueueState queueState) throws DbException {
|
||||
try {
|
||||
BdfDictionary d = new BdfDictionary();
|
||||
d.put(OUTGOING_POSITION_KEY, queueState.outgoingPosition);
|
||||
d.put(INCOMING_POSITION_KEY, queueState.incomingPosition);
|
||||
BdfList pendingList = new BdfList();
|
||||
for (Entry<Long, MessageId> e : queueState.pending.entrySet())
|
||||
pendingList.add(BdfList.of(e.getKey(), e.getValue()));
|
||||
d.put(PENDING_MESSAGES_KEY, pendingList);
|
||||
Metadata groupMeta = new Metadata();
|
||||
groupMeta.put(QUEUE_STATE_KEY, clientHelper.toByteArray(d));
|
||||
db.mergeGroupMetadata(txn, g, groupMeta);
|
||||
} catch (FormatException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static class QueueState {
|
||||
|
||||
private long outgoingPosition, incomingPosition;
|
||||
private final TreeMap<Long, MessageId> pending;
|
||||
|
||||
private QueueState(long outgoingPosition, long incomingPosition,
|
||||
TreeMap<Long, MessageId> pending) {
|
||||
this.outgoingPosition = outgoingPosition;
|
||||
this.incomingPosition = incomingPosition;
|
||||
this.pending = pending;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
MessageId popIncomingMessageId() {
|
||||
Iterator<Entry<Long, MessageId>> it = pending.entrySet().iterator();
|
||||
if (!it.hasNext()) {
|
||||
LOG.info("No pending messages");
|
||||
return null;
|
||||
}
|
||||
Entry<Long, MessageId> e = it.next();
|
||||
if (!e.getKey().equals(incomingPosition)) {
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
LOG.info("First pending message is " + e.getKey() + ", "
|
||||
+ " expecting " + incomingPosition);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info("Removing pending message " + e.getKey());
|
||||
it.remove();
|
||||
incomingPosition++;
|
||||
return e.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
@NotNullByDefault
|
||||
private static class DelegatingMessageValidator
|
||||
implements MessageValidator {
|
||||
|
||||
private final QueueMessageValidator delegate;
|
||||
|
||||
private DelegatingMessageValidator(QueueMessageValidator delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageContext validateMessage(Message m, Group g)
|
||||
throws InvalidMessageException {
|
||||
byte[] raw = m.getRaw();
|
||||
if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH)
|
||||
throw new InvalidMessageException();
|
||||
long queuePosition = ByteUtils.readUint64(raw,
|
||||
MESSAGE_HEADER_LENGTH);
|
||||
if (queuePosition < 0) throw new InvalidMessageException();
|
||||
QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(),
|
||||
m.getTimestamp(), queuePosition, raw);
|
||||
return delegate.validateMessage(q, g);
|
||||
}
|
||||
}
|
||||
|
||||
@NotNullByDefault
|
||||
private class DelegatingIncomingMessageHook implements IncomingMessageHook {
|
||||
|
||||
private final IncomingQueueMessageHook delegate;
|
||||
|
||||
private DelegatingIncomingMessageHook(
|
||||
IncomingQueueMessageHook delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean incomingMessage(Transaction txn, Message m,
|
||||
Metadata meta) throws DbException, InvalidMessageException {
|
||||
long queuePosition = ByteUtils.readUint64(m.getRaw(),
|
||||
MESSAGE_HEADER_LENGTH);
|
||||
QueueState queueState = loadQueueState(txn, m.getGroupId());
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
LOG.info("Received message with position "
|
||||
+ queuePosition + ", expecting "
|
||||
+ queueState.incomingPosition);
|
||||
}
|
||||
if (queuePosition < queueState.incomingPosition) {
|
||||
// A message with this queue position has already been seen
|
||||
LOG.warning("Deleting message with duplicate position");
|
||||
db.deleteMessage(txn, m.getId());
|
||||
db.deleteMessageMetadata(txn, m.getId());
|
||||
} else if (queuePosition > queueState.incomingPosition) {
|
||||
// The message is out of order, add it to the pending list
|
||||
LOG.info("Message is out of order, adding to pending list");
|
||||
queueState.pending.put(queuePosition, m.getId());
|
||||
saveQueueState(txn, m.getGroupId(), queueState);
|
||||
} else {
|
||||
// The message is in order
|
||||
LOG.info("Message is in order, delivering");
|
||||
QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(),
|
||||
m.getTimestamp(), queuePosition, m.getRaw());
|
||||
queueState.incomingPosition++;
|
||||
// Collect any consecutive messages
|
||||
List<MessageId> consecutive = new ArrayList<>();
|
||||
MessageId next;
|
||||
while ((next = queueState.popIncomingMessageId()) != null)
|
||||
consecutive.add(next);
|
||||
// Save the queue state before passing control to the delegate
|
||||
saveQueueState(txn, m.getGroupId(), queueState);
|
||||
// Deliver the messages to the delegate
|
||||
delegate.incomingMessage(txn, q, meta);
|
||||
for (MessageId id : consecutive) {
|
||||
byte[] raw = db.getRawMessage(txn, id);
|
||||
if (raw == null) throw new DbException();
|
||||
meta = db.getMessageMetadata(txn, id);
|
||||
q = queueMessageFactory.createMessage(id, raw);
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
LOG.info("Delivering pending message with position "
|
||||
+ q.getQueuePosition());
|
||||
}
|
||||
delegate.incomingMessage(txn, q, meta);
|
||||
}
|
||||
}
|
||||
// message queues are only useful for groups with two members
|
||||
// so messages don't need to be shared
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,60 +0,0 @@
|
||||
package org.briarproject.briar.client;
|
||||
|
||||
import org.briarproject.bramble.api.UniqueId;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageFactory;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.util.ByteUtils;
|
||||
import org.briarproject.briar.api.client.QueueMessage;
|
||||
import org.briarproject.briar.api.client.QueueMessageFactory;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
import static org.briarproject.bramble.util.ByteUtils.INT_64_BYTES;
|
||||
import static org.briarproject.briar.api.client.QueueMessage.MAX_QUEUE_MESSAGE_BODY_LENGTH;
|
||||
import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH;
|
||||
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
class QueueMessageFactoryImpl implements QueueMessageFactory {
|
||||
|
||||
private final MessageFactory messageFactory;
|
||||
|
||||
@Inject
|
||||
QueueMessageFactoryImpl(MessageFactory messageFactory) {
|
||||
this.messageFactory = messageFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueMessage createMessage(GroupId groupId, long timestamp,
|
||||
long queuePosition, byte[] body) {
|
||||
if (body.length > MAX_QUEUE_MESSAGE_BODY_LENGTH)
|
||||
throw new IllegalArgumentException();
|
||||
byte[] messageBody = new byte[INT_64_BYTES + body.length];
|
||||
ByteUtils.writeUint64(queuePosition, messageBody, 0);
|
||||
System.arraycopy(body, 0, messageBody, INT_64_BYTES, body.length);
|
||||
Message m = messageFactory.createMessage(groupId, timestamp,
|
||||
messageBody);
|
||||
return new QueueMessage(m.getId(), groupId, timestamp, queuePosition,
|
||||
m.getRaw());
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueMessage createMessage(MessageId id, byte[] raw) {
|
||||
if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH)
|
||||
throw new IllegalArgumentException();
|
||||
if (raw.length > MAX_MESSAGE_LENGTH)
|
||||
throw new IllegalArgumentException();
|
||||
byte[] groupId = new byte[UniqueId.LENGTH];
|
||||
System.arraycopy(raw, 0, groupId, 0, UniqueId.LENGTH);
|
||||
long timestamp = ByteUtils.readUint64(raw, UniqueId.LENGTH);
|
||||
long queuePosition = ByteUtils.readUint64(raw, MESSAGE_HEADER_LENGTH);
|
||||
return new QueueMessage(id, new GroupId(groupId), timestamp,
|
||||
queuePosition, raw);
|
||||
}
|
||||
}
|
||||
@@ -1,566 +0,0 @@
|
||||
package org.briarproject.briar.client;
|
||||
|
||||
import org.briarproject.bramble.api.client.ClientHelper;
|
||||
import org.briarproject.bramble.api.data.BdfDictionary;
|
||||
import org.briarproject.bramble.api.data.BdfList;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.Metadata;
|
||||
import org.briarproject.bramble.api.db.Transaction;
|
||||
import org.briarproject.bramble.api.sync.ClientId;
|
||||
import org.briarproject.bramble.api.sync.Group;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.InvalidMessageException;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageContext;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.ValidationManager;
|
||||
import org.briarproject.bramble.api.sync.ValidationManager.IncomingMessageHook;
|
||||
import org.briarproject.bramble.api.sync.ValidationManager.MessageValidator;
|
||||
import org.briarproject.bramble.test.CaptureArgumentAction;
|
||||
import org.briarproject.bramble.test.TestUtils;
|
||||
import org.briarproject.bramble.util.ByteUtils;
|
||||
import org.briarproject.briar.api.client.MessageQueueManager.IncomingQueueMessageHook;
|
||||
import org.briarproject.briar.api.client.MessageQueueManager.QueueMessageValidator;
|
||||
import org.briarproject.briar.api.client.QueueMessage;
|
||||
import org.briarproject.briar.api.client.QueueMessageFactory;
|
||||
import org.briarproject.briar.test.BriarTestCase;
|
||||
import org.hamcrest.Description;
|
||||
import org.jmock.Expectations;
|
||||
import org.jmock.Mockery;
|
||||
import org.jmock.api.Action;
|
||||
import org.jmock.api.Invocation;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
import static org.briarproject.bramble.test.TestUtils.getClientId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getGroup;
|
||||
import static org.briarproject.briar.api.client.MessageQueueManager.QUEUE_STATE_KEY;
|
||||
import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class MessageQueueManagerImplTest extends BriarTestCase {
|
||||
|
||||
private final ClientId clientId = getClientId();
|
||||
private final Group group = getGroup(clientId);
|
||||
private final GroupId groupId = group.getId();
|
||||
private final long timestamp = System.currentTimeMillis();
|
||||
|
||||
@Test
|
||||
public void testSendingMessages() throws Exception {
|
||||
Mockery context = new Mockery();
|
||||
DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
ClientHelper clientHelper = context.mock(ClientHelper.class);
|
||||
QueueMessageFactory queueMessageFactory =
|
||||
context.mock(QueueMessageFactory.class);
|
||||
ValidationManager validationManager =
|
||||
context.mock(ValidationManager.class);
|
||||
|
||||
Transaction txn = new Transaction(null, false);
|
||||
byte[] body = new byte[123];
|
||||
Metadata groupMetadata = new Metadata();
|
||||
Metadata messageMetadata = new Metadata();
|
||||
Metadata groupMetadata1 = new Metadata();
|
||||
byte[] queueState = new byte[123];
|
||||
groupMetadata1.put(QUEUE_STATE_KEY, queueState);
|
||||
|
||||
context.checking(new Expectations() {{
|
||||
// First message: queue state does not exist
|
||||
oneOf(db).getGroupMetadata(txn, groupId);
|
||||
will(returnValue(groupMetadata));
|
||||
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
|
||||
will(new EncodeQueueStateAction(1L, 0L, new BdfList()));
|
||||
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
|
||||
with(any(Metadata.class)));
|
||||
oneOf(queueMessageFactory).createMessage(groupId, timestamp, 0L,
|
||||
body);
|
||||
will(new CreateMessageAction());
|
||||
oneOf(db).addLocalMessage(with(txn), with(any(QueueMessage.class)),
|
||||
with(messageMetadata), with(true));
|
||||
// Second message: queue state exists
|
||||
oneOf(db).getGroupMetadata(txn, groupId);
|
||||
will(returnValue(groupMetadata1));
|
||||
oneOf(clientHelper).toDictionary(queueState, 0, queueState.length);
|
||||
will(new DecodeQueueStateAction(1L, 0L, new BdfList()));
|
||||
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
|
||||
will(new EncodeQueueStateAction(2L, 0L, new BdfList()));
|
||||
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
|
||||
with(any(Metadata.class)));
|
||||
oneOf(queueMessageFactory).createMessage(groupId, timestamp, 1L,
|
||||
body);
|
||||
will(new CreateMessageAction());
|
||||
oneOf(db).addLocalMessage(with(txn), with(any(QueueMessage.class)),
|
||||
with(messageMetadata), with(true));
|
||||
}});
|
||||
|
||||
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
|
||||
clientHelper, queueMessageFactory, validationManager);
|
||||
|
||||
// First message
|
||||
QueueMessage q = mqm.sendMessage(txn, group, timestamp, body,
|
||||
messageMetadata);
|
||||
assertEquals(groupId, q.getGroupId());
|
||||
assertEquals(timestamp, q.getTimestamp());
|
||||
assertEquals(0L, q.getQueuePosition());
|
||||
assertEquals(QUEUE_MESSAGE_HEADER_LENGTH + body.length, q.getLength());
|
||||
|
||||
// Second message
|
||||
QueueMessage q1 = mqm.sendMessage(txn, group, timestamp, body,
|
||||
messageMetadata);
|
||||
assertEquals(groupId, q1.getGroupId());
|
||||
assertEquals(timestamp, q1.getTimestamp());
|
||||
assertEquals(1L, q1.getQueuePosition());
|
||||
assertEquals(QUEUE_MESSAGE_HEADER_LENGTH + body.length, q1.getLength());
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidatorRejectsShortMessage() throws Exception {
|
||||
Mockery context = new Mockery();
|
||||
DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
ClientHelper clientHelper = context.mock(ClientHelper.class);
|
||||
QueueMessageFactory queueMessageFactory =
|
||||
context.mock(QueueMessageFactory.class);
|
||||
ValidationManager validationManager =
|
||||
context.mock(ValidationManager.class);
|
||||
|
||||
AtomicReference<MessageValidator> captured = new AtomicReference<>();
|
||||
QueueMessageValidator queueMessageValidator =
|
||||
context.mock(QueueMessageValidator.class);
|
||||
// The message is too short to be a valid queue message
|
||||
MessageId messageId = new MessageId(TestUtils.getRandomId());
|
||||
byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH - 1];
|
||||
Message message = new Message(messageId, groupId, timestamp, raw);
|
||||
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(validationManager).registerMessageValidator(with(clientId),
|
||||
with(any(MessageValidator.class)));
|
||||
will(new CaptureArgumentAction<>(captured,
|
||||
MessageValidator.class, 1));
|
||||
}});
|
||||
|
||||
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
|
||||
clientHelper, queueMessageFactory, validationManager);
|
||||
|
||||
// Capture the delegating message validator
|
||||
mqm.registerMessageValidator(clientId, queueMessageValidator);
|
||||
MessageValidator delegate = captured.get();
|
||||
assertNotNull(delegate);
|
||||
// The message should be invalid
|
||||
try {
|
||||
delegate.validateMessage(message, group);
|
||||
fail();
|
||||
} catch (InvalidMessageException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidatorRejectsNegativeQueuePosition() throws Exception {
|
||||
Mockery context = new Mockery();
|
||||
DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
ClientHelper clientHelper = context.mock(ClientHelper.class);
|
||||
QueueMessageFactory queueMessageFactory =
|
||||
context.mock(QueueMessageFactory.class);
|
||||
ValidationManager validationManager =
|
||||
context.mock(ValidationManager.class);
|
||||
|
||||
AtomicReference<MessageValidator> captured = new AtomicReference<>();
|
||||
QueueMessageValidator queueMessageValidator =
|
||||
context.mock(QueueMessageValidator.class);
|
||||
// The message has a negative queue position
|
||||
MessageId messageId = new MessageId(TestUtils.getRandomId());
|
||||
byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
|
||||
for (int i = 0; i < 8; i++)
|
||||
raw[MESSAGE_HEADER_LENGTH + i] = (byte) 0xFF;
|
||||
Message message = new Message(messageId, groupId, timestamp, raw);
|
||||
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(validationManager).registerMessageValidator(with(clientId),
|
||||
with(any(MessageValidator.class)));
|
||||
will(new CaptureArgumentAction<>(captured,
|
||||
MessageValidator.class, 1));
|
||||
}});
|
||||
|
||||
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
|
||||
clientHelper, queueMessageFactory, validationManager);
|
||||
|
||||
// Capture the delegating message validator
|
||||
mqm.registerMessageValidator(clientId, queueMessageValidator);
|
||||
MessageValidator delegate = captured.get();
|
||||
assertNotNull(delegate);
|
||||
// The message should be invalid
|
||||
try {
|
||||
delegate.validateMessage(message, group);
|
||||
fail();
|
||||
} catch (InvalidMessageException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidatorDelegatesValidMessage() throws Exception {
|
||||
Mockery context = new Mockery();
|
||||
DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
ClientHelper clientHelper = context.mock(ClientHelper.class);
|
||||
QueueMessageFactory queueMessageFactory =
|
||||
context.mock(QueueMessageFactory.class);
|
||||
ValidationManager validationManager =
|
||||
context.mock(ValidationManager.class);
|
||||
|
||||
AtomicReference<MessageValidator> captured = new AtomicReference<>();
|
||||
QueueMessageValidator queueMessageValidator =
|
||||
context.mock(QueueMessageValidator.class);
|
||||
Metadata metadata = new Metadata();
|
||||
MessageContext messageContext =
|
||||
new MessageContext(metadata);
|
||||
// The message is valid, with a queue position of zero
|
||||
MessageId messageId = new MessageId(TestUtils.getRandomId());
|
||||
byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
|
||||
Message message = new Message(messageId, groupId, timestamp, raw);
|
||||
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(validationManager).registerMessageValidator(with(clientId),
|
||||
with(any(MessageValidator.class)));
|
||||
will(new CaptureArgumentAction<>(captured,
|
||||
MessageValidator.class, 1));
|
||||
// The message should be delegated
|
||||
oneOf(queueMessageValidator).validateMessage(
|
||||
with(any(QueueMessage.class)), with(group));
|
||||
will(returnValue(messageContext));
|
||||
}});
|
||||
|
||||
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
|
||||
clientHelper, queueMessageFactory, validationManager);
|
||||
|
||||
// Capture the delegating message validator
|
||||
mqm.registerMessageValidator(clientId, queueMessageValidator);
|
||||
MessageValidator delegate = captured.get();
|
||||
assertNotNull(delegate);
|
||||
// The message should be valid and the metadata should be returned
|
||||
assertSame(messageContext, delegate.validateMessage(message, group));
|
||||
assertSame(metadata, messageContext.getMetadata());
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncomingMessageHookDeletesDuplicateMessage()
|
||||
throws Exception {
|
||||
Mockery context = new Mockery();
|
||||
DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
ClientHelper clientHelper = context.mock(ClientHelper.class);
|
||||
QueueMessageFactory queueMessageFactory =
|
||||
context.mock(QueueMessageFactory.class);
|
||||
ValidationManager validationManager =
|
||||
context.mock(ValidationManager.class);
|
||||
AtomicReference<IncomingMessageHook> captured = new AtomicReference<>();
|
||||
IncomingQueueMessageHook incomingQueueMessageHook =
|
||||
context.mock(IncomingQueueMessageHook.class);
|
||||
|
||||
Transaction txn = new Transaction(null, false);
|
||||
Metadata groupMetadata = new Metadata();
|
||||
byte[] queueState = new byte[123];
|
||||
groupMetadata.put(QUEUE_STATE_KEY, queueState);
|
||||
// The message has queue position 0
|
||||
MessageId messageId = new MessageId(TestUtils.getRandomId());
|
||||
byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
|
||||
Message message = new Message(messageId, groupId, timestamp, raw);
|
||||
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(validationManager).registerIncomingMessageHook(with(clientId),
|
||||
with(any(IncomingMessageHook.class)));
|
||||
will(new CaptureArgumentAction<>(captured,
|
||||
IncomingMessageHook.class, 1));
|
||||
oneOf(db).getGroupMetadata(txn, groupId);
|
||||
will(returnValue(groupMetadata));
|
||||
// Queue position 1 is expected
|
||||
oneOf(clientHelper).toDictionary(queueState, 0, queueState.length);
|
||||
will(new DecodeQueueStateAction(0L, 1L, new BdfList()));
|
||||
// The message and its metadata should be deleted
|
||||
oneOf(db).deleteMessage(txn, messageId);
|
||||
oneOf(db).deleteMessageMetadata(txn, messageId);
|
||||
}});
|
||||
|
||||
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
|
||||
clientHelper, queueMessageFactory, validationManager);
|
||||
|
||||
// Capture the delegating incoming message hook
|
||||
mqm.registerIncomingMessageHook(clientId, incomingQueueMessageHook);
|
||||
IncomingMessageHook delegate = captured.get();
|
||||
assertNotNull(delegate);
|
||||
// Pass the message to the hook
|
||||
delegate.incomingMessage(txn, message, new Metadata());
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncomingMessageHookAddsOutOfOrderMessageToPendingList()
|
||||
throws Exception {
|
||||
Mockery context = new Mockery();
|
||||
DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
ClientHelper clientHelper = context.mock(ClientHelper.class);
|
||||
QueueMessageFactory queueMessageFactory =
|
||||
context.mock(QueueMessageFactory.class);
|
||||
ValidationManager validationManager =
|
||||
context.mock(ValidationManager.class);
|
||||
AtomicReference<IncomingMessageHook> captured = new AtomicReference<>();
|
||||
IncomingQueueMessageHook incomingQueueMessageHook =
|
||||
context.mock(IncomingQueueMessageHook.class);
|
||||
|
||||
Transaction txn = new Transaction(null, false);
|
||||
Metadata groupMetadata = new Metadata();
|
||||
byte[] queueState = new byte[123];
|
||||
groupMetadata.put(QUEUE_STATE_KEY, queueState);
|
||||
// The message has queue position 1
|
||||
MessageId messageId = new MessageId(TestUtils.getRandomId());
|
||||
byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
|
||||
ByteUtils.writeUint64(1L, raw, MESSAGE_HEADER_LENGTH);
|
||||
Message message = new Message(messageId, groupId, timestamp, raw);
|
||||
BdfList pending = BdfList.of(BdfList.of(1L, messageId));
|
||||
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(validationManager).registerIncomingMessageHook(with(clientId),
|
||||
with(any(IncomingMessageHook.class)));
|
||||
will(new CaptureArgumentAction<>(captured,
|
||||
IncomingMessageHook.class, 1));
|
||||
oneOf(db).getGroupMetadata(txn, groupId);
|
||||
will(returnValue(groupMetadata));
|
||||
// Queue position 0 is expected
|
||||
oneOf(clientHelper).toDictionary(queueState, 0, queueState.length);
|
||||
will(new DecodeQueueStateAction(0L, 0L, new BdfList()));
|
||||
// The message should be added to the pending list
|
||||
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
|
||||
will(new EncodeQueueStateAction(0L, 0L, pending));
|
||||
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
|
||||
with(any(Metadata.class)));
|
||||
}});
|
||||
|
||||
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
|
||||
clientHelper, queueMessageFactory, validationManager);
|
||||
|
||||
// Capture the delegating incoming message hook
|
||||
mqm.registerIncomingMessageHook(clientId, incomingQueueMessageHook);
|
||||
IncomingMessageHook delegate = captured.get();
|
||||
assertNotNull(delegate);
|
||||
// Pass the message to the hook
|
||||
delegate.incomingMessage(txn, message, new Metadata());
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncomingMessageHookDelegatesInOrderMessage()
|
||||
throws Exception {
|
||||
Mockery context = new Mockery();
|
||||
DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
ClientHelper clientHelper = context.mock(ClientHelper.class);
|
||||
QueueMessageFactory queueMessageFactory =
|
||||
context.mock(QueueMessageFactory.class);
|
||||
ValidationManager validationManager =
|
||||
context.mock(ValidationManager.class);
|
||||
AtomicReference<IncomingMessageHook> captured = new AtomicReference<>();
|
||||
IncomingQueueMessageHook incomingQueueMessageHook =
|
||||
context.mock(IncomingQueueMessageHook.class);
|
||||
|
||||
Transaction txn = new Transaction(null, false);
|
||||
Metadata groupMetadata = new Metadata();
|
||||
byte[] queueState = new byte[123];
|
||||
groupMetadata.put(QUEUE_STATE_KEY, queueState);
|
||||
// The message has queue position 0
|
||||
MessageId messageId = new MessageId(TestUtils.getRandomId());
|
||||
byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
|
||||
Message message = new Message(messageId, groupId, timestamp, raw);
|
||||
Metadata messageMetadata = new Metadata();
|
||||
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(validationManager).registerIncomingMessageHook(with(clientId),
|
||||
with(any(IncomingMessageHook.class)));
|
||||
will(new CaptureArgumentAction<>(captured,
|
||||
IncomingMessageHook.class, 1));
|
||||
oneOf(db).getGroupMetadata(txn, groupId);
|
||||
will(returnValue(groupMetadata));
|
||||
// Queue position 0 is expected
|
||||
oneOf(clientHelper).toDictionary(queueState, 0, queueState.length);
|
||||
will(new DecodeQueueStateAction(0L, 0L, new BdfList()));
|
||||
// Queue position 1 should be expected next
|
||||
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
|
||||
will(new EncodeQueueStateAction(0L, 1L, new BdfList()));
|
||||
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
|
||||
with(any(Metadata.class)));
|
||||
// The message should be delegated
|
||||
oneOf(incomingQueueMessageHook).incomingMessage(with(txn),
|
||||
with(any(QueueMessage.class)), with(messageMetadata));
|
||||
}});
|
||||
|
||||
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
|
||||
clientHelper, queueMessageFactory, validationManager);
|
||||
|
||||
// Capture the delegating incoming message hook
|
||||
mqm.registerIncomingMessageHook(clientId, incomingQueueMessageHook);
|
||||
IncomingMessageHook delegate = captured.get();
|
||||
assertNotNull(delegate);
|
||||
// Pass the message to the hook
|
||||
delegate.incomingMessage(txn, message, messageMetadata);
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncomingMessageHookRetrievesPendingMessage()
|
||||
throws Exception {
|
||||
Mockery context = new Mockery();
|
||||
DatabaseComponent db = context.mock(DatabaseComponent.class);
|
||||
ClientHelper clientHelper = context.mock(ClientHelper.class);
|
||||
QueueMessageFactory queueMessageFactory =
|
||||
context.mock(QueueMessageFactory.class);
|
||||
ValidationManager validationManager =
|
||||
context.mock(ValidationManager.class);
|
||||
AtomicReference<IncomingMessageHook> captured = new AtomicReference<>();
|
||||
IncomingQueueMessageHook incomingQueueMessageHook =
|
||||
context.mock(IncomingQueueMessageHook.class);
|
||||
|
||||
Transaction txn = new Transaction(null, false);
|
||||
Metadata groupMetadata = new Metadata();
|
||||
byte[] queueState = new byte[123];
|
||||
groupMetadata.put(QUEUE_STATE_KEY, queueState);
|
||||
// The message has queue position 0
|
||||
MessageId messageId = new MessageId(TestUtils.getRandomId());
|
||||
byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
|
||||
Message message = new Message(messageId, groupId, timestamp, raw);
|
||||
Metadata messageMetadata = new Metadata();
|
||||
// Queue position 1 is pending
|
||||
MessageId messageId1 = new MessageId(TestUtils.getRandomId());
|
||||
byte[] raw1 = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
|
||||
QueueMessage message1 = new QueueMessage(messageId1, groupId,
|
||||
timestamp, 1L, raw1);
|
||||
Metadata messageMetadata1 = new Metadata();
|
||||
BdfList pending = BdfList.of(BdfList.of(1L, messageId1));
|
||||
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(validationManager).registerIncomingMessageHook(with(clientId),
|
||||
with(any(IncomingMessageHook.class)));
|
||||
will(new CaptureArgumentAction<>(captured,
|
||||
IncomingMessageHook.class, 1));
|
||||
oneOf(db).getGroupMetadata(txn, groupId);
|
||||
will(returnValue(groupMetadata));
|
||||
// Queue position 0 is expected, position 1 is pending
|
||||
oneOf(clientHelper).toDictionary(queueState, 0, queueState.length);
|
||||
will(new DecodeQueueStateAction(0L, 0L, pending));
|
||||
// Queue position 2 should be expected next
|
||||
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
|
||||
will(new EncodeQueueStateAction(0L, 2L, new BdfList()));
|
||||
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
|
||||
with(any(Metadata.class)));
|
||||
// The new message should be delegated
|
||||
oneOf(incomingQueueMessageHook).incomingMessage(with(txn),
|
||||
with(any(QueueMessage.class)), with(messageMetadata));
|
||||
// The pending message should be retrieved
|
||||
oneOf(db).getRawMessage(txn, messageId1);
|
||||
will(returnValue(raw1));
|
||||
oneOf(db).getMessageMetadata(txn, messageId1);
|
||||
will(returnValue(messageMetadata1));
|
||||
oneOf(queueMessageFactory).createMessage(messageId1, raw1);
|
||||
will(returnValue(message1));
|
||||
// The pending message should be delegated
|
||||
oneOf(incomingQueueMessageHook).incomingMessage(txn, message1,
|
||||
messageMetadata1);
|
||||
}});
|
||||
|
||||
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
|
||||
clientHelper, queueMessageFactory, validationManager);
|
||||
|
||||
// Capture the delegating incoming message hook
|
||||
mqm.registerIncomingMessageHook(clientId, incomingQueueMessageHook);
|
||||
IncomingMessageHook delegate = captured.get();
|
||||
assertNotNull(delegate);
|
||||
// Pass the message to the hook
|
||||
delegate.incomingMessage(txn, message, messageMetadata);
|
||||
|
||||
context.assertIsSatisfied();
|
||||
}
|
||||
|
||||
private class EncodeQueueStateAction implements Action {
|
||||
|
||||
private final long outgoingPosition, incomingPosition;
|
||||
private final BdfList pending;
|
||||
|
||||
private EncodeQueueStateAction(long outgoingPosition,
|
||||
long incomingPosition, BdfList pending) {
|
||||
this.outgoingPosition = outgoingPosition;
|
||||
this.incomingPosition = incomingPosition;
|
||||
this.pending = pending;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object invoke(Invocation invocation) throws Throwable {
|
||||
BdfDictionary d = (BdfDictionary) invocation.getParameter(0);
|
||||
assertEquals(outgoingPosition, d.getLong("nextOut").longValue());
|
||||
assertEquals(incomingPosition, d.getLong("nextIn").longValue());
|
||||
assertEquals(pending, d.getList("pending"));
|
||||
return new byte[123];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
description.appendText("encodes a queue state");
|
||||
}
|
||||
}
|
||||
|
||||
private class DecodeQueueStateAction implements Action {
|
||||
|
||||
private final long outgoingPosition, incomingPosition;
|
||||
private final BdfList pending;
|
||||
|
||||
private DecodeQueueStateAction(long outgoingPosition,
|
||||
long incomingPosition, BdfList pending) {
|
||||
this.outgoingPosition = outgoingPosition;
|
||||
this.incomingPosition = incomingPosition;
|
||||
this.pending = pending;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object invoke(Invocation invocation) throws Throwable {
|
||||
BdfDictionary d = new BdfDictionary();
|
||||
d.put("nextOut", outgoingPosition);
|
||||
d.put("nextIn", incomingPosition);
|
||||
d.put("pending", pending);
|
||||
return d;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
description.appendText("decodes a queue state");
|
||||
}
|
||||
}
|
||||
|
||||
private class CreateMessageAction implements Action {
|
||||
|
||||
@Override
|
||||
public Object invoke(Invocation invocation) throws Throwable {
|
||||
GroupId groupId = (GroupId) invocation.getParameter(0);
|
||||
long timestamp = (Long) invocation.getParameter(1);
|
||||
long queuePosition = (Long) invocation.getParameter(2);
|
||||
byte[] body = (byte[]) invocation.getParameter(3);
|
||||
byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH + body.length];
|
||||
MessageId id = new MessageId(TestUtils.getRandomId());
|
||||
return new QueueMessage(id, groupId, timestamp, queuePosition, raw);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
description.appendText("creates a message");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user