Merge branch '272-message-queue-flow-control' into 'master'

Save queue state before delivering message. #272

Another attempt to fix #272...

See merge request !125
This commit is contained in:
akwizgran
2016-03-30 16:35:36 +00:00
2 changed files with 38 additions and 22 deletions

View File

@@ -20,7 +20,9 @@ import org.briarproject.api.sync.ValidationManager;
import org.briarproject.api.sync.ValidationManager.IncomingMessageHook; import org.briarproject.api.sync.ValidationManager.IncomingMessageHook;
import org.briarproject.util.ByteUtils; import org.briarproject.util.ByteUtils;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -207,15 +209,21 @@ class MessageQueueManagerImpl implements MessageQueueManager {
queueState.pending.put(queuePosition, m.getId()); queueState.pending.put(queuePosition, m.getId());
saveQueueState(txn, m.getGroupId(), queueState); saveQueueState(txn, m.getGroupId(), queueState);
} else { } else {
// The message is in order, pass it to the delegate // The message is in order
LOG.info("Message is in order, delivering"); LOG.info("Message is in order, delivering");
QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(), QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(),
m.getTimestamp(), queuePosition, m.getRaw()); m.getTimestamp(), queuePosition, m.getRaw());
delegate.incomingMessage(txn, q, meta);
queueState.incomingPosition++; queueState.incomingPosition++;
// Pass any consecutive messages to the delegate // Collect any consecutive messages
MessageId id; List<MessageId> consecutive = new ArrayList<MessageId>();
while ((id = queueState.popIncomingMessageId()) != null) { MessageId next;
while ((next = queueState.popIncomingMessageId()) != null)
consecutive.add(next);
// Save the queue state before passing control to the delegate
saveQueueState(txn, m.getGroupId(), queueState);
// Deliver the messages to the delegate
delegate.incomingMessage(txn, q, meta);
for (MessageId id : consecutive) {
byte[] raw = db.getRawMessage(txn, id); byte[] raw = db.getRawMessage(txn, id);
meta = db.getMessageMetadata(txn, id); meta = db.getMessageMetadata(txn, id);
q = queueMessageFactory.createMessage(id, raw); q = queueMessageFactory.createMessage(id, raw);
@@ -225,7 +233,6 @@ class MessageQueueManagerImpl implements MessageQueueManager {
} }
delegate.incomingMessage(txn, q, meta); delegate.incomingMessage(txn, q, meta);
} }
saveQueueState(txn, m.getGroupId(), queueState);
} }
} }
} }

View File

@@ -56,6 +56,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
context.mock(QueueMessageFactory.class); context.mock(QueueMessageFactory.class);
final ValidationManager validationManager = final ValidationManager validationManager =
context.mock(ValidationManager.class); context.mock(ValidationManager.class);
final Transaction txn = new Transaction(null, false); final Transaction txn = new Transaction(null, false);
final byte[] body = new byte[123]; final byte[] body = new byte[123];
final Metadata groupMetadata = new Metadata(); final Metadata groupMetadata = new Metadata();
@@ -63,6 +64,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final Metadata groupMetadata1 = new Metadata(); final Metadata groupMetadata1 = new Metadata();
final byte[] queueState = new byte[123]; final byte[] queueState = new byte[123];
groupMetadata1.put(QUEUE_STATE_KEY, queueState); groupMetadata1.put(QUEUE_STATE_KEY, queueState);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// First message: queue state does not exist // First message: queue state does not exist
oneOf(db).getGroupMetadata(txn, groupId); oneOf(db).getGroupMetadata(txn, groupId);
@@ -123,6 +125,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
context.mock(QueueMessageFactory.class); context.mock(QueueMessageFactory.class);
final ValidationManager validationManager = final ValidationManager validationManager =
context.mock(ValidationManager.class); context.mock(ValidationManager.class);
final AtomicReference<MessageValidator> captured = final AtomicReference<MessageValidator> captured =
new AtomicReference<MessageValidator>(); new AtomicReference<MessageValidator>();
final QueueMessageValidator queueMessageValidator = final QueueMessageValidator queueMessageValidator =
@@ -131,6 +134,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final MessageId messageId = new MessageId(TestUtils.getRandomId()); final MessageId messageId = new MessageId(TestUtils.getRandomId());
final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH - 1]; final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH - 1];
final Message message = new Message(messageId, groupId, timestamp, raw); final Message message = new Message(messageId, groupId, timestamp, raw);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(validationManager).registerMessageValidator(with(clientId), oneOf(validationManager).registerMessageValidator(with(clientId),
with(any(MessageValidator.class))); with(any(MessageValidator.class)));
@@ -138,7 +142,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
MessageValidator.class, 1)); MessageValidator.class, 1));
}}); }});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager); clientHelper, queueMessageFactory, validationManager);
@@ -161,6 +164,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
context.mock(QueueMessageFactory.class); context.mock(QueueMessageFactory.class);
final ValidationManager validationManager = final ValidationManager validationManager =
context.mock(ValidationManager.class); context.mock(ValidationManager.class);
final AtomicReference<MessageValidator> captured = final AtomicReference<MessageValidator> captured =
new AtomicReference<MessageValidator>(); new AtomicReference<MessageValidator>();
final QueueMessageValidator queueMessageValidator = final QueueMessageValidator queueMessageValidator =
@@ -171,6 +175,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
for (int i = 0; i < 8; i++) for (int i = 0; i < 8; i++)
raw[MESSAGE_HEADER_LENGTH + i] = (byte) 0xFF; raw[MESSAGE_HEADER_LENGTH + i] = (byte) 0xFF;
final Message message = new Message(messageId, groupId, timestamp, raw); final Message message = new Message(messageId, groupId, timestamp, raw);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(validationManager).registerMessageValidator(with(clientId), oneOf(validationManager).registerMessageValidator(with(clientId),
with(any(MessageValidator.class))); with(any(MessageValidator.class)));
@@ -178,7 +183,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
MessageValidator.class, 1)); MessageValidator.class, 1));
}}); }});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager); clientHelper, queueMessageFactory, validationManager);
@@ -201,6 +205,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
context.mock(QueueMessageFactory.class); context.mock(QueueMessageFactory.class);
final ValidationManager validationManager = final ValidationManager validationManager =
context.mock(ValidationManager.class); context.mock(ValidationManager.class);
final AtomicReference<MessageValidator> captured = final AtomicReference<MessageValidator> captured =
new AtomicReference<MessageValidator>(); new AtomicReference<MessageValidator>();
final QueueMessageValidator queueMessageValidator = final QueueMessageValidator queueMessageValidator =
@@ -210,6 +215,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final MessageId messageId = new MessageId(TestUtils.getRandomId()); final MessageId messageId = new MessageId(TestUtils.getRandomId());
final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
final Message message = new Message(messageId, groupId, timestamp, raw); final Message message = new Message(messageId, groupId, timestamp, raw);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(validationManager).registerMessageValidator(with(clientId), oneOf(validationManager).registerMessageValidator(with(clientId),
with(any(MessageValidator.class))); with(any(MessageValidator.class)));
@@ -221,7 +227,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
will(returnValue(messageMetadata)); will(returnValue(messageMetadata));
}}); }});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager); clientHelper, queueMessageFactory, validationManager);
@@ -249,6 +254,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
new AtomicReference<IncomingMessageHook>(); new AtomicReference<IncomingMessageHook>();
final IncomingQueueMessageHook incomingQueueMessageHook = final IncomingQueueMessageHook incomingQueueMessageHook =
context.mock(IncomingQueueMessageHook.class); context.mock(IncomingQueueMessageHook.class);
final Transaction txn = new Transaction(null, false); final Transaction txn = new Transaction(null, false);
final Metadata groupMetadata = new Metadata(); final Metadata groupMetadata = new Metadata();
final byte[] queueState = new byte[123]; final byte[] queueState = new byte[123];
@@ -257,6 +263,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final MessageId messageId = new MessageId(TestUtils.getRandomId()); final MessageId messageId = new MessageId(TestUtils.getRandomId());
final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
final Message message = new Message(messageId, groupId, timestamp, raw); final Message message = new Message(messageId, groupId, timestamp, raw);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(validationManager).registerIncomingMessageHook(with(clientId), oneOf(validationManager).registerIncomingMessageHook(with(clientId),
with(any(IncomingMessageHook.class))); with(any(IncomingMessageHook.class)));
@@ -272,7 +279,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
oneOf(db).deleteMessageMetadata(txn, messageId); oneOf(db).deleteMessageMetadata(txn, messageId);
}}); }});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager); clientHelper, queueMessageFactory, validationManager);
@@ -300,6 +306,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
new AtomicReference<IncomingMessageHook>(); new AtomicReference<IncomingMessageHook>();
final IncomingQueueMessageHook incomingQueueMessageHook = final IncomingQueueMessageHook incomingQueueMessageHook =
context.mock(IncomingQueueMessageHook.class); context.mock(IncomingQueueMessageHook.class);
final Transaction txn = new Transaction(null, false); final Transaction txn = new Transaction(null, false);
final Metadata groupMetadata = new Metadata(); final Metadata groupMetadata = new Metadata();
final byte[] queueState = new byte[123]; final byte[] queueState = new byte[123];
@@ -310,6 +317,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
ByteUtils.writeUint64(1L, raw, MESSAGE_HEADER_LENGTH); ByteUtils.writeUint64(1L, raw, MESSAGE_HEADER_LENGTH);
final Message message = new Message(messageId, groupId, timestamp, raw); final Message message = new Message(messageId, groupId, timestamp, raw);
final BdfList pending = BdfList.of(BdfList.of(1L, messageId)); final BdfList pending = BdfList.of(BdfList.of(1L, messageId));
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(validationManager).registerIncomingMessageHook(with(clientId), oneOf(validationManager).registerIncomingMessageHook(with(clientId),
with(any(IncomingMessageHook.class))); with(any(IncomingMessageHook.class)));
@@ -327,7 +335,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
with(any(Metadata.class))); with(any(Metadata.class)));
}}); }});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager); clientHelper, queueMessageFactory, validationManager);
@@ -355,6 +362,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
new AtomicReference<IncomingMessageHook>(); new AtomicReference<IncomingMessageHook>();
final IncomingQueueMessageHook incomingQueueMessageHook = final IncomingQueueMessageHook incomingQueueMessageHook =
context.mock(IncomingQueueMessageHook.class); context.mock(IncomingQueueMessageHook.class);
final Transaction txn = new Transaction(null, false); final Transaction txn = new Transaction(null, false);
final Metadata groupMetadata = new Metadata(); final Metadata groupMetadata = new Metadata();
final byte[] queueState = new byte[123]; final byte[] queueState = new byte[123];
@@ -364,6 +372,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
final Message message = new Message(messageId, groupId, timestamp, raw); final Message message = new Message(messageId, groupId, timestamp, raw);
final Metadata messageMetadata = new Metadata(); final Metadata messageMetadata = new Metadata();
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(validationManager).registerIncomingMessageHook(with(clientId), oneOf(validationManager).registerIncomingMessageHook(with(clientId),
with(any(IncomingMessageHook.class))); with(any(IncomingMessageHook.class)));
@@ -374,17 +383,16 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
// Queue position 0 is expected // Queue position 0 is expected
oneOf(clientHelper).toDictionary(queueState, 0, queueState.length); oneOf(clientHelper).toDictionary(queueState, 0, queueState.length);
will(new DecodeQueueStateAction(0L, 0L, new BdfList())); will(new DecodeQueueStateAction(0L, 0L, new BdfList()));
// The message should be delegated
oneOf(incomingQueueMessageHook).incomingMessage(with(txn),
with(any(QueueMessage.class)), with(messageMetadata));
// Queue position 1 should be expected next // Queue position 1 should be expected next
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class))); oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
will(new EncodeQueueStateAction(0L, 1L, new BdfList())); will(new EncodeQueueStateAction(0L, 1L, new BdfList()));
oneOf(db).mergeGroupMetadata(with(txn), with(groupId), oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
with(any(Metadata.class))); with(any(Metadata.class)));
// The message should be delegated
oneOf(incomingQueueMessageHook).incomingMessage(with(txn),
with(any(QueueMessage.class)), with(messageMetadata));
}}); }});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager); clientHelper, queueMessageFactory, validationManager);
@@ -412,6 +420,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
new AtomicReference<IncomingMessageHook>(); new AtomicReference<IncomingMessageHook>();
final IncomingQueueMessageHook incomingQueueMessageHook = final IncomingQueueMessageHook incomingQueueMessageHook =
context.mock(IncomingQueueMessageHook.class); context.mock(IncomingQueueMessageHook.class);
final Transaction txn = new Transaction(null, false); final Transaction txn = new Transaction(null, false);
final Metadata groupMetadata = new Metadata(); final Metadata groupMetadata = new Metadata();
final byte[] queueState = new byte[123]; final byte[] queueState = new byte[123];
@@ -428,6 +437,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
timestamp, 1L, raw1); timestamp, 1L, raw1);
final Metadata messageMetadata1 = new Metadata(); final Metadata messageMetadata1 = new Metadata();
final BdfList pending = BdfList.of(BdfList.of(1L, messageId1)); final BdfList pending = BdfList.of(BdfList.of(1L, messageId1));
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(validationManager).registerIncomingMessageHook(with(clientId), oneOf(validationManager).registerIncomingMessageHook(with(clientId),
with(any(IncomingMessageHook.class))); with(any(IncomingMessageHook.class)));
@@ -438,7 +448,12 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
// Queue position 0 is expected, position 1 is pending // Queue position 0 is expected, position 1 is pending
oneOf(clientHelper).toDictionary(queueState, 0, queueState.length); oneOf(clientHelper).toDictionary(queueState, 0, queueState.length);
will(new DecodeQueueStateAction(0L, 0L, pending)); will(new DecodeQueueStateAction(0L, 0L, pending));
// The message should be delegated // Queue position 2 should be expected next
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
will(new EncodeQueueStateAction(0L, 2L, new BdfList()));
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
with(any(Metadata.class)));
// The new message should be delegated
oneOf(incomingQueueMessageHook).incomingMessage(with(txn), oneOf(incomingQueueMessageHook).incomingMessage(with(txn),
with(any(QueueMessage.class)), with(messageMetadata)); with(any(QueueMessage.class)), with(messageMetadata));
// The pending message should be retrieved // The pending message should be retrieved
@@ -451,14 +466,8 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
// The pending message should be delegated // The pending message should be delegated
oneOf(incomingQueueMessageHook).incomingMessage(txn, message1, oneOf(incomingQueueMessageHook).incomingMessage(txn, message1,
messageMetadata1); messageMetadata1);
// Queue position 2 should be expected next
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
will(new EncodeQueueStateAction(0L, 2L, new BdfList()));
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
with(any(Metadata.class)));
}}); }});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager); clientHelper, queueMessageFactory, validationManager);