Address review comments

This commit is contained in:
Torsten Grote
2016-11-04 11:28:56 -02:00
parent 3f9a254a0b
commit 719a53dc94
6 changed files with 41 additions and 52 deletions

View File

@@ -62,26 +62,26 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook,
@Override
public boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException, InvalidMessageException {
return incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH);
try {
return incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH);
} catch (FormatException e) {
throw new InvalidMessageException(e);
}
}
@Override
public void incomingMessage(Transaction txn, QueueMessage q, Metadata meta)
throws DbException, InvalidMessageException {
throws DbException, FormatException {
incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH);
}
private boolean incomingMessage(Transaction txn, Message m, Metadata meta,
int headerLength) throws DbException, InvalidMessageException {
try {
byte[] raw = m.getRaw();
BdfList body = clientHelper.toList(raw, headerLength,
raw.length - headerLength);
BdfDictionary metaDictionary = metadataParser.parse(meta);
return incomingMessage(txn, m, body, metaDictionary);
} catch (FormatException e) {
throw new InvalidMessageException(e);
}
int headerLength) throws DbException, FormatException {
byte[] raw = m.getRaw();
BdfList body = clientHelper.toList(raw, headerLength,
raw.length - headerLength);
BdfDictionary metaDictionary = metadataParser.parse(meta);
return incomingMessage(txn, m, body, metaDictionary);
}
protected void trackIncomingMessage(Transaction txn, Message m)

View File

@@ -227,16 +227,20 @@ class MessageQueueManagerImpl implements MessageQueueManager {
// Save the queue state before passing control to the delegate
saveQueueState(txn, m.getGroupId(), queueState);
// Deliver the messages to the delegate
delegate.incomingMessage(txn, q, meta);
for (MessageId id : consecutive) {
byte[] raw = db.getRawMessage(txn, id);
meta = db.getMessageMetadata(txn, id);
q = queueMessageFactory.createMessage(id, raw);
if (LOG.isLoggable(INFO)) {
LOG.info("Delivering pending message with position "
+ q.getQueuePosition());
}
try {
delegate.incomingMessage(txn, q, meta);
for (MessageId id : consecutive) {
byte[] raw = db.getRawMessage(txn, id);
meta = db.getMessageMetadata(txn, id);
q = queueMessageFactory.createMessage(id, raw);
if (LOG.isLoggable(INFO)) {
LOG.info("Delivering pending message with position "
+ q.getQueuePosition());
}
delegate.incomingMessage(txn, q, meta);
}
} catch (FormatException e) {
throw new InvalidMessageException(e);
}
}
// message queues are only useful for groups with two members

View File

@@ -246,7 +246,9 @@ class IntroductionManagerImpl extends ConversationClientImpl
} else if (role == ROLE_INTRODUCEE) {
introduceeManager.incomingMessage(txn, state, message);
} else {
throw new AssertionError("Unknown role '" + role + "'");
if (LOG.isLoggable(WARNING))
LOG.warning("Unknown role '" + role + "'");
throw new DbException();
}
if (type == TYPE_RESPONSE) trackIncomingMessage(txn, m);
} catch (DbException e) {