mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-18 13:49:53 +01:00
Added message queue manager.
This commit is contained in:
@@ -0,0 +1,38 @@
|
|||||||
|
package org.briarproject.api.clients;
|
||||||
|
|
||||||
|
import org.briarproject.api.db.DbException;
|
||||||
|
import org.briarproject.api.db.Metadata;
|
||||||
|
import org.briarproject.api.db.Transaction;
|
||||||
|
import org.briarproject.api.sync.ClientId;
|
||||||
|
import org.briarproject.api.sync.Group;
|
||||||
|
|
||||||
|
public interface MessageQueueManager {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The key used for storing the queue's state in the group metadata.
|
||||||
|
*/
|
||||||
|
String QUEUE_STATE_KEY = "queueState";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends a message using the given queue.
|
||||||
|
*/
|
||||||
|
QueueMessage sendMessage(Transaction txn, Group queue, long timestamp,
|
||||||
|
byte[] body, Metadata meta) throws DbException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the message validator for the given client.
|
||||||
|
*/
|
||||||
|
void registerMessageValidator(ClientId c, QueueMessageValidator v);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the incoming message hook for the given client. The hook will be
|
||||||
|
* called once for each incoming message that passes validation. Messages
|
||||||
|
* are passed to the hook in order.
|
||||||
|
*/
|
||||||
|
void registerIncomingMessageHook(ClientId c, IncomingQueueMessageHook hook);
|
||||||
|
|
||||||
|
interface IncomingQueueMessageHook {
|
||||||
|
void incomingMessage(Transaction txn, QueueMessage q, Metadata meta)
|
||||||
|
throws DbException;
|
||||||
|
}
|
||||||
|
}
|
||||||
28
briar-api/src/org/briarproject/api/clients/QueueMessage.java
Normal file
28
briar-api/src/org/briarproject/api/clients/QueueMessage.java
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
package org.briarproject.api.clients;
|
||||||
|
|
||||||
|
import org.briarproject.api.sync.GroupId;
|
||||||
|
import org.briarproject.api.sync.Message;
|
||||||
|
import org.briarproject.api.sync.MessageId;
|
||||||
|
|
||||||
|
import static org.briarproject.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
|
||||||
|
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||||
|
|
||||||
|
public class QueueMessage extends Message {
|
||||||
|
|
||||||
|
public static final int QUEUE_MESSAGE_HEADER_LENGTH =
|
||||||
|
MESSAGE_HEADER_LENGTH + 8;
|
||||||
|
public static final int MAX_QUEUE_MESSAGE_BODY_LENGTH =
|
||||||
|
MAX_MESSAGE_BODY_LENGTH - 8;
|
||||||
|
|
||||||
|
private final long queuePosition;
|
||||||
|
|
||||||
|
public QueueMessage(MessageId id, GroupId groupId, long timestamp,
|
||||||
|
long queuePosition, byte[] raw) {
|
||||||
|
super(id, groupId, timestamp, raw);
|
||||||
|
this.queuePosition = queuePosition;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getQueuePosition() {
|
||||||
|
return queuePosition;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
package org.briarproject.api.clients;
|
||||||
|
|
||||||
|
import org.briarproject.api.sync.GroupId;
|
||||||
|
import org.briarproject.api.sync.MessageId;
|
||||||
|
|
||||||
|
public interface QueueMessageFactory {
|
||||||
|
|
||||||
|
QueueMessage createMessage(GroupId groupId, long timestamp,
|
||||||
|
long queuePosition, byte[] body);
|
||||||
|
|
||||||
|
QueueMessage createMessage(MessageId id, byte[] raw);
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
package org.briarproject.api.clients;
|
||||||
|
|
||||||
|
import org.briarproject.api.db.Metadata;
|
||||||
|
import org.briarproject.api.sync.Group;
|
||||||
|
|
||||||
|
public interface QueueMessageValidator {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates the given message and returns its metadata if the message
|
||||||
|
* is valid, or null if the message is invalid.
|
||||||
|
*/
|
||||||
|
Metadata validateMessage(QueueMessage q, Group g);
|
||||||
|
}
|
||||||
@@ -3,13 +3,17 @@ package org.briarproject.clients;
|
|||||||
import com.google.inject.AbstractModule;
|
import com.google.inject.AbstractModule;
|
||||||
|
|
||||||
import org.briarproject.api.clients.ClientHelper;
|
import org.briarproject.api.clients.ClientHelper;
|
||||||
|
import org.briarproject.api.clients.MessageQueueManager;
|
||||||
import org.briarproject.api.clients.PrivateGroupFactory;
|
import org.briarproject.api.clients.PrivateGroupFactory;
|
||||||
|
import org.briarproject.api.clients.QueueMessageFactory;
|
||||||
|
|
||||||
public class ClientsModule extends AbstractModule {
|
public class ClientsModule extends AbstractModule {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
bind(ClientHelper.class).to(ClientHelperImpl.class);
|
bind(ClientHelper.class).to(ClientHelperImpl.class);
|
||||||
|
bind(MessageQueueManager.class).to(MessageQueueManagerImpl.class);
|
||||||
bind(PrivateGroupFactory.class).to(PrivateGroupFactoryImpl.class);
|
bind(PrivateGroupFactory.class).to(PrivateGroupFactoryImpl.class);
|
||||||
|
bind(QueueMessageFactory.class).to(QueueMessageFactoryImpl.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,211 @@
|
|||||||
|
package org.briarproject.clients;
|
||||||
|
|
||||||
|
import org.briarproject.api.FormatException;
|
||||||
|
import org.briarproject.api.clients.ClientHelper;
|
||||||
|
import org.briarproject.api.clients.MessageQueueManager;
|
||||||
|
import org.briarproject.api.clients.QueueMessage;
|
||||||
|
import org.briarproject.api.clients.QueueMessageFactory;
|
||||||
|
import org.briarproject.api.clients.QueueMessageValidator;
|
||||||
|
import org.briarproject.api.data.BdfDictionary;
|
||||||
|
import org.briarproject.api.data.BdfList;
|
||||||
|
import org.briarproject.api.db.DatabaseComponent;
|
||||||
|
import org.briarproject.api.db.DbException;
|
||||||
|
import org.briarproject.api.db.Metadata;
|
||||||
|
import org.briarproject.api.db.Transaction;
|
||||||
|
import org.briarproject.api.sync.ClientId;
|
||||||
|
import org.briarproject.api.sync.Group;
|
||||||
|
import org.briarproject.api.sync.GroupId;
|
||||||
|
import org.briarproject.api.sync.Message;
|
||||||
|
import org.briarproject.api.sync.MessageId;
|
||||||
|
import org.briarproject.api.sync.MessageValidator;
|
||||||
|
import org.briarproject.api.sync.ValidationManager;
|
||||||
|
import org.briarproject.api.sync.ValidationManager.IncomingMessageHook;
|
||||||
|
import org.briarproject.util.ByteUtils;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import javax.inject.Inject;
|
||||||
|
|
||||||
|
import static org.briarproject.api.clients.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH;
|
||||||
|
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||||
|
|
||||||
|
class MessageQueueManagerImpl implements MessageQueueManager {
|
||||||
|
|
||||||
|
private static final String OUTGOING_POSITION_KEY = "nextOut";
|
||||||
|
private static final String INCOMING_POSITION_KEY = "nextIn";
|
||||||
|
private static final String PENDING_MESSAGES_KEY = "pending";
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
Logger.getLogger(MessageQueueManagerImpl.class.getName());
|
||||||
|
|
||||||
|
private final DatabaseComponent db;
|
||||||
|
private final ClientHelper clientHelper;
|
||||||
|
private final QueueMessageFactory queueMessageFactory;
|
||||||
|
private final ValidationManager validationManager;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
MessageQueueManagerImpl(DatabaseComponent db, ClientHelper clientHelper,
|
||||||
|
QueueMessageFactory queueMessageFactory,
|
||||||
|
ValidationManager validationManager) {
|
||||||
|
this.db = db;
|
||||||
|
this.clientHelper = clientHelper;
|
||||||
|
this.queueMessageFactory = queueMessageFactory;
|
||||||
|
this.validationManager = validationManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueMessage sendMessage(Transaction txn, Group queue,
|
||||||
|
long timestamp, byte[] body, Metadata meta) throws DbException {
|
||||||
|
QueueState queueState = loadQueueState(txn, queue.getId());
|
||||||
|
long queuePosition = queueState.outgoingPosition;
|
||||||
|
queueState.outgoingPosition++;
|
||||||
|
saveQueueState(txn, queue.getId(), queueState);
|
||||||
|
QueueMessage q = queueMessageFactory.createMessage(queue.getId(),
|
||||||
|
timestamp, queuePosition, body);
|
||||||
|
db.addLocalMessage(txn, q, queue.getClientId(), meta, true);
|
||||||
|
return q;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerMessageValidator(ClientId c, QueueMessageValidator v) {
|
||||||
|
validationManager.registerMessageValidator(c,
|
||||||
|
new DelegatingMessageValidator(v));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerIncomingMessageHook(ClientId c,
|
||||||
|
IncomingQueueMessageHook hook) {
|
||||||
|
validationManager.registerIncomingMessageHook(c,
|
||||||
|
new DelegatingIncomingMessageHook(hook));
|
||||||
|
}
|
||||||
|
|
||||||
|
private QueueState loadQueueState(Transaction txn, GroupId g)
|
||||||
|
throws DbException {
|
||||||
|
try {
|
||||||
|
TreeMap<Long, MessageId> pending = new TreeMap<Long, MessageId>();
|
||||||
|
Metadata groupMeta = db.getGroupMetadata(txn, g);
|
||||||
|
byte[] raw = groupMeta.get(QUEUE_STATE_KEY);
|
||||||
|
if (raw == null) return new QueueState(0, 0, pending);
|
||||||
|
BdfDictionary d = clientHelper.toDictionary(raw, 0, raw.length);
|
||||||
|
long outgoingPosition = d.getLong(OUTGOING_POSITION_KEY);
|
||||||
|
long incomingPosition = d.getLong(INCOMING_POSITION_KEY);
|
||||||
|
BdfList pendingList = d.getList(PENDING_MESSAGES_KEY);
|
||||||
|
for (int i = 0; i < pendingList.size(); i++) {
|
||||||
|
BdfList item = pendingList.getList(i);
|
||||||
|
if (item.size() != 2) throw new FormatException();
|
||||||
|
pending.put(item.getLong(0), new MessageId(item.getRaw(1)));
|
||||||
|
}
|
||||||
|
return new QueueState(outgoingPosition, incomingPosition, pending);
|
||||||
|
} catch (FormatException e) {
|
||||||
|
throw new DbException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void saveQueueState(Transaction txn, GroupId g,
|
||||||
|
QueueState queueState) throws DbException {
|
||||||
|
try {
|
||||||
|
BdfDictionary d = new BdfDictionary();
|
||||||
|
d.put(OUTGOING_POSITION_KEY, queueState.outgoingPosition);
|
||||||
|
d.put(INCOMING_POSITION_KEY, queueState.incomingPosition);
|
||||||
|
BdfList pendingList = new BdfList();
|
||||||
|
for (Entry<Long, MessageId> e : queueState.pending.entrySet())
|
||||||
|
pendingList.add(BdfList.of(e.getKey(), e.getValue()));
|
||||||
|
d.put(PENDING_MESSAGES_KEY, pendingList);
|
||||||
|
Metadata groupMeta = new Metadata();
|
||||||
|
groupMeta.put(QUEUE_STATE_KEY, clientHelper.toByteArray(d));
|
||||||
|
db.mergeGroupMetadata(txn, g, groupMeta);
|
||||||
|
} catch (FormatException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class QueueState {
|
||||||
|
|
||||||
|
private long outgoingPosition, incomingPosition;
|
||||||
|
private final TreeMap<Long, MessageId> pending;
|
||||||
|
|
||||||
|
QueueState(long outgoingPosition, long incomingPosition,
|
||||||
|
TreeMap<Long, MessageId> pending) {
|
||||||
|
this.outgoingPosition = outgoingPosition;
|
||||||
|
this.incomingPosition = incomingPosition;
|
||||||
|
this.pending = pending;
|
||||||
|
}
|
||||||
|
|
||||||
|
MessageId popIncomingMessageId() {
|
||||||
|
Iterator<Entry<Long, MessageId>> it = pending.entrySet().iterator();
|
||||||
|
if (!it.hasNext()) return null;
|
||||||
|
Entry<Long, MessageId> e = it.next();
|
||||||
|
if (!e.getKey().equals(incomingPosition)) return null;
|
||||||
|
it.remove();
|
||||||
|
incomingPosition++;
|
||||||
|
return e.getValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class DelegatingMessageValidator
|
||||||
|
implements MessageValidator {
|
||||||
|
|
||||||
|
private final QueueMessageValidator delegate;
|
||||||
|
|
||||||
|
DelegatingMessageValidator(QueueMessageValidator delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Metadata validateMessage(Message m, Group g) {
|
||||||
|
byte[] raw = m.getRaw();
|
||||||
|
if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH) return null;
|
||||||
|
long queuePosition = ByteUtils.readUint64(raw,
|
||||||
|
MESSAGE_HEADER_LENGTH);
|
||||||
|
if (queuePosition < 0) return null;
|
||||||
|
QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(),
|
||||||
|
m.getTimestamp(), queuePosition, raw);
|
||||||
|
return delegate.validateMessage(q, g);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class DelegatingIncomingMessageHook implements IncomingMessageHook {
|
||||||
|
|
||||||
|
private final IncomingQueueMessageHook delegate;
|
||||||
|
|
||||||
|
DelegatingIncomingMessageHook(IncomingQueueMessageHook delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void incomingMessage(Transaction txn, Message m, Metadata meta)
|
||||||
|
throws DbException {
|
||||||
|
long queuePosition = ByteUtils.readUint64(m.getRaw(),
|
||||||
|
MESSAGE_HEADER_LENGTH);
|
||||||
|
QueueState queueState = loadQueueState(txn, m.getGroupId());
|
||||||
|
if (queuePosition < queueState.incomingPosition) {
|
||||||
|
// A message with this queue position has already been seen
|
||||||
|
LOG.warning("Deleting message with duplicate position");
|
||||||
|
db.deleteMessage(txn, m.getId());
|
||||||
|
db.deleteMessageMetadata(txn, m.getId());
|
||||||
|
} else if (queuePosition > queueState.incomingPosition) {
|
||||||
|
// The message is out of order, add it to the pending list
|
||||||
|
queueState.pending.put(queuePosition, m.getId());
|
||||||
|
saveQueueState(txn, m.getGroupId(), queueState);
|
||||||
|
} else {
|
||||||
|
// The message is in order, pass it to the delegate
|
||||||
|
QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(),
|
||||||
|
m.getTimestamp(), queuePosition, m.getRaw());
|
||||||
|
delegate.incomingMessage(txn, q, meta);
|
||||||
|
queueState.incomingPosition++;
|
||||||
|
// Pass any consecutive messages to the delegate
|
||||||
|
MessageId id;
|
||||||
|
while ((id = queueState.popIncomingMessageId()) != null) {
|
||||||
|
byte[] raw = db.getRawMessage(txn, id);
|
||||||
|
meta = db.getMessageMetadata(txn, id);
|
||||||
|
q = queueMessageFactory.createMessage(id, raw);
|
||||||
|
delegate.incomingMessage(txn, q, meta);
|
||||||
|
}
|
||||||
|
saveQueueState(txn, m.getGroupId(), queueState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,28 +3,26 @@ package org.briarproject.clients;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
|
||||||
import org.briarproject.api.Bytes;
|
import org.briarproject.api.Bytes;
|
||||||
|
import org.briarproject.api.FormatException;
|
||||||
|
import org.briarproject.api.clients.ClientHelper;
|
||||||
import org.briarproject.api.clients.PrivateGroupFactory;
|
import org.briarproject.api.clients.PrivateGroupFactory;
|
||||||
import org.briarproject.api.contact.Contact;
|
import org.briarproject.api.contact.Contact;
|
||||||
import org.briarproject.api.data.BdfWriter;
|
import org.briarproject.api.data.BdfList;
|
||||||
import org.briarproject.api.data.BdfWriterFactory;
|
|
||||||
import org.briarproject.api.identity.AuthorId;
|
import org.briarproject.api.identity.AuthorId;
|
||||||
import org.briarproject.api.sync.ClientId;
|
import org.briarproject.api.sync.ClientId;
|
||||||
import org.briarproject.api.sync.Group;
|
import org.briarproject.api.sync.Group;
|
||||||
import org.briarproject.api.sync.GroupFactory;
|
import org.briarproject.api.sync.GroupFactory;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
class PrivateGroupFactoryImpl implements PrivateGroupFactory {
|
class PrivateGroupFactoryImpl implements PrivateGroupFactory {
|
||||||
|
|
||||||
private final GroupFactory groupFactory;
|
private final GroupFactory groupFactory;
|
||||||
private final BdfWriterFactory bdfWriterFactory;
|
private final ClientHelper clientHelper;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
PrivateGroupFactoryImpl(GroupFactory groupFactory,
|
PrivateGroupFactoryImpl(GroupFactory groupFactory,
|
||||||
BdfWriterFactory bdfWriterFactory) {
|
ClientHelper clientHelper) {
|
||||||
this.groupFactory = groupFactory;
|
this.groupFactory = groupFactory;
|
||||||
this.bdfWriterFactory = bdfWriterFactory;
|
this.clientHelper = clientHelper;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -36,22 +34,12 @@ class PrivateGroupFactoryImpl implements PrivateGroupFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private byte[] createGroupDescriptor(AuthorId local, AuthorId remote) {
|
private byte[] createGroupDescriptor(AuthorId local, AuthorId remote) {
|
||||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
||||||
BdfWriter w = bdfWriterFactory.createWriter(out);
|
|
||||||
try {
|
try {
|
||||||
w.writeListStart();
|
if (Bytes.COMPARATOR.compare(local, remote) < 0)
|
||||||
if (Bytes.COMPARATOR.compare(local, remote) < 0) {
|
return clientHelper.toByteArray(BdfList.of(local, remote));
|
||||||
w.writeRaw(local.getBytes());
|
else return clientHelper.toByteArray(BdfList.of(remote, local));
|
||||||
w.writeRaw(remote.getBytes());
|
} catch (FormatException e) {
|
||||||
} else {
|
|
||||||
w.writeRaw(remote.getBytes());
|
|
||||||
w.writeRaw(local.getBytes());
|
|
||||||
}
|
|
||||||
w.writeListEnd();
|
|
||||||
} catch (IOException e) {
|
|
||||||
// Shouldn't happen with ByteArrayOutputStream
|
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
return out.toByteArray();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,55 @@
|
|||||||
|
package org.briarproject.clients;
|
||||||
|
|
||||||
|
import org.briarproject.api.UniqueId;
|
||||||
|
import org.briarproject.api.clients.QueueMessage;
|
||||||
|
import org.briarproject.api.clients.QueueMessageFactory;
|
||||||
|
import org.briarproject.api.crypto.CryptoComponent;
|
||||||
|
import org.briarproject.api.sync.GroupId;
|
||||||
|
import org.briarproject.api.sync.MessageId;
|
||||||
|
import org.briarproject.util.ByteUtils;
|
||||||
|
|
||||||
|
import javax.inject.Inject;
|
||||||
|
|
||||||
|
import static org.briarproject.api.clients.QueueMessage.MAX_QUEUE_MESSAGE_BODY_LENGTH;
|
||||||
|
import static org.briarproject.api.clients.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH;
|
||||||
|
import static org.briarproject.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
|
||||||
|
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||||
|
|
||||||
|
class QueueMessageFactoryImpl implements QueueMessageFactory {
|
||||||
|
|
||||||
|
private final CryptoComponent crypto;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
QueueMessageFactoryImpl(CryptoComponent crypto) {
|
||||||
|
this.crypto = crypto;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueMessage createMessage(GroupId groupId, long timestamp,
|
||||||
|
long queuePosition, byte[] body) {
|
||||||
|
if (body.length > MAX_QUEUE_MESSAGE_BODY_LENGTH)
|
||||||
|
throw new IllegalArgumentException();
|
||||||
|
byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH + body.length];
|
||||||
|
System.arraycopy(groupId.getBytes(), 0, raw, 0, UniqueId.LENGTH);
|
||||||
|
ByteUtils.writeUint64(timestamp, raw, UniqueId.LENGTH);
|
||||||
|
ByteUtils.writeUint64(queuePosition, raw, MESSAGE_HEADER_LENGTH);
|
||||||
|
System.arraycopy(body, 0, raw, QUEUE_MESSAGE_HEADER_LENGTH,
|
||||||
|
body.length);
|
||||||
|
MessageId id = new MessageId(crypto.hash(MessageId.LABEL, raw));
|
||||||
|
return new QueueMessage(id, groupId, timestamp, queuePosition, raw);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueMessage createMessage(MessageId id, byte[] raw) {
|
||||||
|
if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH)
|
||||||
|
throw new IllegalArgumentException();
|
||||||
|
if (raw.length > MAX_MESSAGE_LENGTH)
|
||||||
|
throw new IllegalArgumentException();
|
||||||
|
byte[] groupId = new byte[UniqueId.LENGTH];
|
||||||
|
System.arraycopy(raw, 0, groupId, 0, UniqueId.LENGTH);
|
||||||
|
long timestamp = ByteUtils.readUint64(raw, UniqueId.LENGTH);
|
||||||
|
long queuePosition = ByteUtils.readUint64(raw, MESSAGE_HEADER_LENGTH);
|
||||||
|
return new QueueMessage(id, new GroupId(groupId), timestamp,
|
||||||
|
queuePosition, raw);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -20,7 +20,7 @@ class ForumListValidator extends BdfMessageValidator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BdfDictionary validateMessage(BdfList message, Group g,
|
protected BdfDictionary validateMessage(BdfList message, Group g,
|
||||||
long timestamp) throws FormatException {
|
long timestamp) throws FormatException {
|
||||||
// Version, forum list
|
// Version, forum list
|
||||||
checkSize(message, 2);
|
checkSize(message, 2);
|
||||||
|
|||||||
Reference in New Issue
Block a user