Separate the sync layer from its clients. #112

This commit is contained in:
akwizgran
2015-12-21 14:36:24 +00:00
parent f5f572139a
commit 5355951466
117 changed files with 3160 additions and 3465 deletions

View File

@@ -1,7 +1,6 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.data.BdfWriter;
import org.briarproject.api.data.BdfWriterFactory;
import org.briarproject.api.identity.Author;
@@ -49,10 +48,8 @@ class AuthorFactoryImpl implements AuthorFactory {
w.writeListEnd();
} catch (IOException e) {
// Shouldn't happen with ByteArrayOutputStream
throw new RuntimeException();
throw new RuntimeException(e);
}
MessageDigest messageDigest = crypto.getMessageDigest();
messageDigest.update(out.toByteArray());
return new AuthorId(messageDigest.digest());
return new AuthorId(crypto.hash(AuthorId.LABEL, out.toByteArray()));
}
}

View File

@@ -1,12 +1,10 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorId;
import org.briarproject.api.identity.AuthorFactory;
import java.io.IOException;
@@ -15,26 +13,18 @@ import static org.briarproject.api.identity.AuthorConstants.MAX_PUBLIC_KEY_LENGT
class AuthorReader implements ObjectReader<Author> {
private final MessageDigest messageDigest;
private final AuthorFactory authorFactory;
AuthorReader(CryptoComponent crypto) {
messageDigest = crypto.getMessageDigest();
AuthorReader(AuthorFactory authorFactory) {
this.authorFactory = authorFactory;
}
public Author readObject(BdfReader r) throws IOException {
// Set up the reader
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
r.addConsumer(digesting);
// Read and digest the data
r.readListStart();
String name = r.readString(MAX_AUTHOR_NAME_LENGTH);
if (name.length() == 0) throw new FormatException();
byte[] publicKey = r.readRaw(MAX_PUBLIC_KEY_LENGTH);
r.readListEnd();
// Reset the reader
r.removeConsumer(digesting);
// Build and return the author
AuthorId id = new AuthorId(messageDigest.digest());
return new Author(id, name, publicKey);
return authorFactory.createAuthor(name, publicKey);
}
}

View File

@@ -1,24 +0,0 @@
package org.briarproject.sync;
import org.briarproject.api.data.Consumer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/** A consumer that makes a copy of the bytes consumed. */
class CopyingConsumer implements Consumer {
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
public byte[] getCopy() {
return out.toByteArray();
}
public void write(byte b) throws IOException {
out.write(b);
}
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
}

View File

@@ -1,34 +0,0 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.data.Consumer;
import java.io.IOException;
/**
* A consumer that counts the number of bytes consumed and throws a
* FormatException if the count exceeds a given limit.
*/
class CountingConsumer implements Consumer {
private final long limit;
private long count = 0;
public CountingConsumer(long limit) {
this.limit = limit;
}
public long getCount() {
return count;
}
public void write(byte b) throws IOException {
count++;
if (count > limit) throw new FormatException();
}
public void write(byte[] b, int off, int len) throws IOException {
count += len;
if (count > limit) throw new FormatException();
}
}

View File

@@ -1,22 +0,0 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.data.Consumer;
/** A consumer that passes its input through a message digest. */
class DigestingConsumer implements Consumer {
private final MessageDigest messageDigest;
public DigestingConsumer(MessageDigest messageDigest) {
this.messageDigest = messageDigest;
}
public void write(byte b) {
messageDigest.update(b);
}
public void write(byte[] b, int off, int len) {
messageDigest.update(b, off, len);
}
}

View File

@@ -10,21 +10,21 @@ import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.MessageValidatedEvent;
import org.briarproject.api.event.RemoteSubscriptionsUpdatedEvent;
import org.briarproject.api.event.RemoteTransportsUpdatedEvent;
import org.briarproject.api.event.ShutdownEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSession;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.system.Clock;
@@ -39,15 +39,15 @@ import java.util.logging.Logger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
/**
* An outgoing {@link org.briarproject.api.sync.MessagingSession
* MessagingSession} suitable for duplex transports. The session offers
* messages before sending them, keeps its output stream open when there are no
* packets to send, and reacts to events that make packets available to send.
* An outgoing {@link org.briarproject.api.sync.SyncSession SyncSession}
* suitable for duplex transports. The session offers messages before sending
* them, keeps its output stream open when there are no packets to send, and
* reacts to events that make packets available to send.
*/
class DuplexOutgoingSession implements MessagingSession, EventListener {
class DuplexOutgoingSession implements SyncSession, EventListener {
// Check for retransmittable packets once every 60 seconds
private static final int RETX_QUERY_INTERVAL = 60 * 1000;
@@ -161,8 +161,9 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
if (e instanceof ContactRemovedEvent) {
ContactRemovedEvent c = (ContactRemovedEvent) e;
if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof MessageAddedEvent) {
dbExecutor.execute(new GenerateOffer());
} else if (e instanceof MessageValidatedEvent) {
if (((MessageValidatedEvent) e).isValid())
dbExecutor.execute(new GenerateOffer());
} else if (e instanceof LocalSubscriptionsUpdatedEvent) {
LocalSubscriptionsUpdatedEvent l =
(LocalSubscriptionsUpdatedEvent) e;
@@ -243,7 +244,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
if (interrupted) return;
try {
Collection<byte[]> b = db.generateRequestedBatch(contactId,
MAX_PAYLOAD_LENGTH, maxLatency);
MAX_PACKET_PAYLOAD_LENGTH, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b != null) writerTasks.add(new WriteBatch(b));

View File

@@ -1,52 +1,24 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.data.BdfWriter;
import org.briarproject.api.data.BdfWriterFactory;
import org.briarproject.api.sync.ClientId;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupFactory;
import org.briarproject.api.sync.GroupId;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.inject.Inject;
import static org.briarproject.api.sync.MessagingConstants.GROUP_SALT_LENGTH;
class GroupFactoryImpl implements GroupFactory {
private final CryptoComponent crypto;
private final BdfWriterFactory bdfWriterFactory;
@Inject
GroupFactoryImpl(CryptoComponent crypto, BdfWriterFactory bdfWriterFactory) {
GroupFactoryImpl(CryptoComponent crypto) {
this.crypto = crypto;
this.bdfWriterFactory = bdfWriterFactory;
}
public Group createGroup(String name) {
byte[] salt = new byte[GROUP_SALT_LENGTH];
crypto.getSecureRandom().nextBytes(salt);
return createGroup(name, salt);
}
public Group createGroup(String name, byte[] salt) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
BdfWriter w = bdfWriterFactory.createWriter(out);
try {
w.writeListStart();
w.writeString(name);
w.writeRaw(salt);
w.writeListEnd();
} catch (IOException e) {
// Shouldn't happen with ByteArrayOutputStream
throw new RuntimeException();
}
MessageDigest messageDigest = crypto.getMessageDigest();
messageDigest.update(out.toByteArray());
GroupId id = new GroupId(messageDigest.digest());
return new Group(id, name, salt);
public Group createGroup(ClientId c, byte[] descriptor) {
byte[] hash = crypto.hash(GroupId.LABEL, c.getBytes(), descriptor);
return new Group(new GroupId(hash), c, descriptor);
}
}

View File

@@ -1,39 +1,31 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.UniqueId;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.sync.ClientId;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.GroupFactory;
import java.io.IOException;
import static org.briarproject.api.sync.MessagingConstants.GROUP_SALT_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_GROUP_NAME_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MAX_GROUP_DESCRIPTOR_LENGTH;
class GroupReader implements ObjectReader<Group> {
private final MessageDigest messageDigest;
private final GroupFactory groupFactory;
GroupReader(CryptoComponent crypto) {
messageDigest = crypto.getMessageDigest();
GroupReader(GroupFactory groupFactory) {
this.groupFactory = groupFactory;
}
public Group readObject(BdfReader r) throws IOException {
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
// Read and digest the data
r.addConsumer(digesting);
r.readListStart();
String name = r.readString(MAX_GROUP_NAME_LENGTH);
if (name.length() == 0) throw new FormatException();
byte[] salt = r.readRaw(GROUP_SALT_LENGTH);
if (salt.length != GROUP_SALT_LENGTH) throw new FormatException();
byte[] id = r.readRaw(UniqueId.LENGTH);
if (id.length != UniqueId.LENGTH) throw new FormatException();
byte[] descriptor = r.readRaw(MAX_GROUP_DESCRIPTOR_LENGTH);
r.readListEnd();
r.removeConsumer(digesting);
// Build and return the group
GroupId id = new GroupId(messageDigest.digest());
return new Group(id, name, salt);
return groupFactory.createGroup(new ClientId(id), descriptor);
}
}

View File

@@ -13,37 +13,30 @@ import org.briarproject.api.event.ShutdownEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageVerifier;
import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSession;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.sync.UnverifiedMessage;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
/**
* An incoming {@link org.briarproject.api.sync.MessagingSession
* MessagingSession}.
*/
class IncomingSession implements MessagingSession, EventListener {
/** An incoming {@link org.briarproject.api.sync.SyncSession SyncSession}. */
class IncomingSession implements SyncSession, EventListener {
private static final Logger LOG =
Logger.getLogger(IncomingSession.class.getName());
private final DatabaseComponent db;
private final Executor dbExecutor, cryptoExecutor;
private final Executor dbExecutor;
private final EventBus eventBus;
private final MessageVerifier messageVerifier;
private final ContactId contactId;
private final TransportId transportId;
private final PacketReader packetReader;
@@ -51,14 +44,11 @@ class IncomingSession implements MessagingSession, EventListener {
private volatile boolean interrupted = false;
IncomingSession(DatabaseComponent db, Executor dbExecutor,
Executor cryptoExecutor, EventBus eventBus,
MessageVerifier messageVerifier, ContactId contactId,
TransportId transportId, PacketReader packetReader) {
EventBus eventBus, ContactId contactId, TransportId transportId,
PacketReader packetReader) {
this.db = db;
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.eventBus = eventBus;
this.messageVerifier = messageVerifier;
this.contactId = contactId;
this.transportId = transportId;
this.packetReader = packetReader;
@@ -73,8 +63,8 @@ class IncomingSession implements MessagingSession, EventListener {
Ack a = packetReader.readAck();
dbExecutor.execute(new ReceiveAck(a));
} else if (packetReader.hasMessage()) {
UnverifiedMessage m = packetReader.readMessage();
cryptoExecutor.execute(new VerifyMessage(m));
Message m = packetReader.readMessage();
dbExecutor.execute(new ReceiveMessage(m));
} else if (packetReader.hasOffer()) {
Offer o = packetReader.readOffer();
dbExecutor.execute(new ReceiveOffer(o));
@@ -137,25 +127,6 @@ class IncomingSession implements MessagingSession, EventListener {
}
}
private class VerifyMessage implements Runnable {
private final UnverifiedMessage message;
private VerifyMessage(UnverifiedMessage message) {
this.message = message;
}
public void run() {
try {
Message m = messageVerifier.verifyMessage(message);
dbExecutor.execute(new ReceiveMessage(m));
} catch (GeneralSecurityException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveMessage implements Runnable {
private final Message message;

View File

@@ -1,129 +1,39 @@
package org.briarproject.sync;
import com.google.inject.Inject;
import org.briarproject.api.UniqueId;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.crypto.PrivateKey;
import org.briarproject.api.crypto.Signature;
import org.briarproject.api.data.BdfWriter;
import org.briarproject.api.data.BdfWriterFactory;
import org.briarproject.api.data.Consumer;
import org.briarproject.api.identity.Author;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageFactory;
import org.briarproject.api.sync.MessageId;
import org.briarproject.util.StringUtils;
import org.briarproject.util.ByteUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
import javax.inject.Inject;
import static org.briarproject.api.identity.AuthorConstants.MAX_SIGNATURE_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_BODY_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_CONTENT_TYPE_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MESSAGE_SALT_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
class MessageFactoryImpl implements MessageFactory {
private final Signature signature;
private final SecureRandom random;
private final MessageDigest messageDigest;
private final BdfWriterFactory bdfWriterFactory;
private final CryptoComponent crypto;
@Inject
MessageFactoryImpl(CryptoComponent crypto, BdfWriterFactory bdfWriterFactory) {
signature = crypto.getSignature();
random = crypto.getSecureRandom();
messageDigest = crypto.getMessageDigest();
this.bdfWriterFactory = bdfWriterFactory;
MessageFactoryImpl(CryptoComponent crypto) {
this.crypto = crypto;
}
public Message createAnonymousMessage(MessageId parent, Group group,
String contentType, long timestamp, byte[] body) throws IOException,
GeneralSecurityException {
return createMessage(parent, group, null, null, contentType, timestamp,
body);
}
public Message createPseudonymousMessage(MessageId parent, Group group,
Author author, PrivateKey privateKey, String contentType,
long timestamp, byte[] body) throws IOException,
GeneralSecurityException {
return createMessage(parent, group, author, privateKey, contentType,
timestamp, body);
}
private Message createMessage(MessageId parent, Group group, Author author,
PrivateKey privateKey, String contentType, long timestamp,
byte[] body) throws IOException, GeneralSecurityException {
// Validate the arguments
if ((author == null) != (privateKey == null))
@Override
public Message createMessage(GroupId groupId, long timestamp, byte[] body)
throws IOException {
if (body.length > MAX_MESSAGE_BODY_LENGTH)
throw new IllegalArgumentException();
if (StringUtils.toUtf8(contentType).length > MAX_CONTENT_TYPE_LENGTH)
throw new IllegalArgumentException();
if (body.length > MAX_BODY_LENGTH)
throw new IllegalArgumentException();
// Serialise the message to a buffer
ByteArrayOutputStream out = new ByteArrayOutputStream();
BdfWriter w = bdfWriterFactory.createWriter(out);
// Initialise the consumers
CountingConsumer counting = new CountingConsumer(MAX_PAYLOAD_LENGTH);
w.addConsumer(counting);
Consumer digestingConsumer = new DigestingConsumer(messageDigest);
w.addConsumer(digestingConsumer);
Consumer signingConsumer = null;
if (privateKey != null) {
signature.initSign(privateKey);
signingConsumer = new SigningConsumer(signature);
w.addConsumer(signingConsumer);
}
// Write the message
w.writeListStart();
if (parent == null) w.writeNull();
else w.writeRaw(parent.getBytes());
writeGroup(w, group);
if (author == null) w.writeNull();
else writeAuthor(w, author);
w.writeString(contentType);
w.writeInteger(timestamp);
byte[] salt = new byte[MESSAGE_SALT_LENGTH];
random.nextBytes(salt);
w.writeRaw(salt);
w.writeRaw(body);
int bodyStart = (int) counting.getCount() - body.length;
// Sign the message with the author's private key, if there is one
if (privateKey == null) {
w.writeNull();
} else {
w.removeConsumer(signingConsumer);
byte[] sig = signature.sign();
if (sig.length > MAX_SIGNATURE_LENGTH)
throw new IllegalArgumentException();
w.writeRaw(sig);
}
w.writeListEnd();
// Hash the message, including the signature, to get the message ID
w.removeConsumer(digestingConsumer);
MessageId id = new MessageId(messageDigest.digest());
return new MessageImpl(id, parent, group, author, contentType,
timestamp, out.toByteArray(), bodyStart, body.length);
}
private void writeGroup(BdfWriter w, Group g) throws IOException {
w.writeListStart();
w.writeString(g.getName());
w.writeRaw(g.getSalt());
w.writeListEnd();
}
private void writeAuthor(BdfWriter w, Author a) throws IOException {
w.writeListStart();
w.writeString(a.getName());
w.writeRaw(a.getPublicKey());
w.writeListEnd();
byte[] raw = new byte[MESSAGE_HEADER_LENGTH + body.length];
System.arraycopy(groupId.getBytes(), 0, raw, 0, UniqueId.LENGTH);
ByteUtils.writeUint64(timestamp, raw, UniqueId.LENGTH);
System.arraycopy(body, 0, raw, MESSAGE_HEADER_LENGTH, body.length);
MessageId id = new MessageId(crypto.hash(MessageId.LABEL, raw));
return new Message(id, groupId, timestamp, raw);
}
}

View File

@@ -1,84 +0,0 @@
package org.briarproject.sync;
import org.briarproject.api.identity.Author;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import static org.briarproject.api.sync.MessagingConstants.MAX_BODY_LENGTH;
/** A simple in-memory implementation of a message. */
class MessageImpl implements Message {
private final MessageId id, parent;
private final Group group;
private final Author author;
private final String contentType;
private final long timestamp;
private final byte[] raw;
private final int bodyStart, bodyLength;
public MessageImpl(MessageId id, MessageId parent, Group group,
Author author, String contentType, long timestamp,
byte[] raw, int bodyStart, int bodyLength) {
if (bodyStart + bodyLength > raw.length)
throw new IllegalArgumentException();
if (bodyLength > MAX_BODY_LENGTH)
throw new IllegalArgumentException();
this.id = id;
this.parent = parent;
this.group = group;
this.author = author;
this.contentType = contentType;
this.timestamp = timestamp;
this.raw = raw;
this.bodyStart = bodyStart;
this.bodyLength = bodyLength;
}
public MessageId getId() {
return id;
}
public MessageId getParent() {
return parent;
}
public Group getGroup() {
return group;
}
public Author getAuthor() {
return author;
}
public String getContentType() {
return contentType;
}
public long getTimestamp() {
return timestamp;
}
public byte[] getSerialised() {
return raw;
}
public int getBodyStart() {
return bodyStart;
}
public int getBodyLength() {
return bodyLength;
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public boolean equals(Object o) {
return o instanceof Message && id.equals(((Message) o).getId());
}
}

View File

@@ -1,82 +0,0 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.UniqueId;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.identity.Author;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.UnverifiedMessage;
import java.io.IOException;
import static org.briarproject.api.identity.AuthorConstants.MAX_SIGNATURE_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_BODY_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_CONTENT_TYPE_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MESSAGE_SALT_LENGTH;
class MessageReader implements ObjectReader<UnverifiedMessage> {
private final ObjectReader<Group> groupReader;
private final ObjectReader<Author> authorReader;
MessageReader(ObjectReader<Group> groupReader,
ObjectReader<Author> authorReader) {
this.groupReader = groupReader;
this.authorReader = authorReader;
}
public UnverifiedMessage readObject(BdfReader r) throws IOException {
CopyingConsumer copying = new CopyingConsumer();
CountingConsumer counting = new CountingConsumer(MAX_PAYLOAD_LENGTH);
r.addConsumer(copying);
r.addConsumer(counting);
// Read the start of the message
r.readListStart();
// Read the parent's message ID, if there is one
MessageId parent = null;
if (r.hasNull()) {
r.readNull();
} else {
byte[] b = r.readRaw(UniqueId.LENGTH);
if (b.length < UniqueId.LENGTH) throw new FormatException();
parent = new MessageId(b);
}
// Read the group
Group group = groupReader.readObject(r);
// Read the author, if there is one
Author author = null;
if (r.hasNull()) r.readNull();
else author = authorReader.readObject(r);
// Read the content type
String contentType = r.readString(MAX_CONTENT_TYPE_LENGTH);
// Read the timestamp
long timestamp = r.readInteger();
if (timestamp < 0) throw new FormatException();
// Read the salt
byte[] salt = r.readRaw(MESSAGE_SALT_LENGTH);
if (salt.length < MESSAGE_SALT_LENGTH) throw new FormatException();
// Read the message body
byte[] body = r.readRaw(MAX_BODY_LENGTH);
// Record the offset of the body within the message
int bodyStart = (int) counting.getCount() - body.length;
// Record the length of the data covered by the author's signature
int signedLength = (int) counting.getCount();
// Read the author's signature, if there is one
byte[] signature = null;
if (author == null) r.readNull();
else signature = r.readRaw(MAX_SIGNATURE_LENGTH);
// Read the end of the message
r.readListEnd();
// Reset the reader
r.removeConsumer(counting);
r.removeConsumer(copying);
// Build and return the unverified message
byte[] raw = copying.getCopy();
return new UnverifiedMessage(parent, group, author, contentType,
timestamp, raw, signature, bodyStart, body.length,
signedLength);
}
}

View File

@@ -1,68 +0,0 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.KeyParser;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.crypto.PublicKey;
import org.briarproject.api.crypto.Signature;
import org.briarproject.api.identity.Author;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.MessageVerifier;
import org.briarproject.api.sync.UnverifiedMessage;
import org.briarproject.api.system.Clock;
import java.security.GeneralSecurityException;
import java.util.logging.Logger;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static org.briarproject.api.transport.TransportConstants.MAX_CLOCK_DIFFERENCE;
class MessageVerifierImpl implements MessageVerifier {
private static final Logger LOG =
Logger.getLogger(MessageVerifierImpl.class.getName());
private final CryptoComponent crypto;
private final Clock clock;
private final KeyParser keyParser;
@Inject
MessageVerifierImpl(CryptoComponent crypto, Clock clock) {
this.crypto = crypto;
this.clock = clock;
keyParser = crypto.getSignatureKeyParser();
}
public Message verifyMessage(UnverifiedMessage m)
throws GeneralSecurityException {
long now = System.currentTimeMillis();
MessageDigest messageDigest = crypto.getMessageDigest();
Signature signature = crypto.getSignature();
// Reject the message if it's too far in the future
if (m.getTimestamp() > clock.currentTimeMillis() + MAX_CLOCK_DIFFERENCE)
throw new GeneralSecurityException();
// Hash the message to get the message ID
byte[] raw = m.getSerialised();
messageDigest.update(raw);
MessageId id = new MessageId(messageDigest.digest());
// Verify the author's signature, if there is one
Author author = m.getAuthor();
if (author != null) {
PublicKey k = keyParser.parsePublicKey(author.getPublicKey());
signature.initVerify(k);
signature.update(raw, 0, m.getSignedLength());
if (!signature.verify(m.getSignature()))
throw new GeneralSecurityException();
}
Message verified = new MessageImpl(id, m.getParent(), m.getGroup(),
author, m.getContentType(), m.getTimestamp(), raw,
m.getBodyStart(), m.getBodyLength());
long duration = System.currentTimeMillis() - now;
if (LOG.isLoggable(INFO))
LOG.info("Verifying message took " + duration + " ms");
return verified;
}
}

View File

@@ -1,11 +1,11 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.PacketReaderFactory;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.UnverifiedMessage;
import java.io.InputStream;
@@ -13,21 +13,21 @@ import javax.inject.Inject;
class PacketReaderFactoryImpl implements PacketReaderFactory {
private final CryptoComponent crypto;
private final BdfReaderFactory bdfReaderFactory;
private final ObjectReader<UnverifiedMessage> messageReader;
private final ObjectReader<SubscriptionUpdate> subscriptionUpdateReader;
@Inject
PacketReaderFactoryImpl(BdfReaderFactory bdfReaderFactory,
ObjectReader<UnverifiedMessage> messageReader,
PacketReaderFactoryImpl(CryptoComponent crypto,
BdfReaderFactory bdfReaderFactory,
ObjectReader<SubscriptionUpdate> subscriptionUpdateReader) {
this.crypto = crypto;
this.bdfReaderFactory = bdfReaderFactory;
this.messageReader = messageReader;
this.subscriptionUpdateReader = subscriptionUpdateReader;
}
public PacketReader createPacketReader(InputStream in) {
return new PacketReaderImpl(bdfReaderFactory, messageReader,
return new PacketReaderImpl(crypto, bdfReaderFactory,
subscriptionUpdateReader, in);
}
}

View File

@@ -4,10 +4,13 @@ import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.UniqueId;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketReader;
@@ -16,7 +19,6 @@ import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.sync.UnverifiedMessage;
import org.briarproject.util.ByteUtils;
import java.io.ByteArrayInputStream;
@@ -31,9 +33,6 @@ import java.util.Map;
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTIES_PER_TRANSPORT;
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTY_LENGTH;
import static org.briarproject.api.TransportPropertyConstants.MAX_TRANSPORT_ID_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.HEADER_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.PROTOCOL_VERSION;
import static org.briarproject.api.sync.PacketTypes.ACK;
import static org.briarproject.api.sync.PacketTypes.MESSAGE;
import static org.briarproject.api.sync.PacketTypes.OFFER;
@@ -42,14 +41,18 @@ import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_ACK;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_UPDATE;
import static org.briarproject.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PACKET_HEADER_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PROTOCOL_VERSION;
// This class is not thread-safe
class PacketReaderImpl implements PacketReader {
private enum State { BUFFER_EMPTY, BUFFER_FULL, EOF }
private final CryptoComponent crypto;
private final BdfReaderFactory bdfReaderFactory;
private final ObjectReader<UnverifiedMessage> messageReader;
private final ObjectReader<SubscriptionUpdate> subscriptionUpdateReader;
private final InputStream in;
private final byte[] header, payload;
@@ -57,24 +60,23 @@ class PacketReaderImpl implements PacketReader {
private State state = State.BUFFER_EMPTY;
private int payloadLength = 0;
PacketReaderImpl(BdfReaderFactory bdfReaderFactory,
ObjectReader<UnverifiedMessage> messageReader,
PacketReaderImpl(CryptoComponent crypto, BdfReaderFactory bdfReaderFactory,
ObjectReader<SubscriptionUpdate> subscriptionUpdateReader,
InputStream in) {
this.crypto = crypto;
this.bdfReaderFactory = bdfReaderFactory;
this.messageReader = messageReader;
this.subscriptionUpdateReader = subscriptionUpdateReader;
this.in = in;
header = new byte[HEADER_LENGTH];
payload = new byte[MAX_PAYLOAD_LENGTH];
header = new byte[PACKET_HEADER_LENGTH];
payload = new byte[MAX_PACKET_PAYLOAD_LENGTH];
}
private void readPacket() throws IOException {
assert state == State.BUFFER_EMPTY;
if (state != State.BUFFER_EMPTY) throw new IllegalStateException();
// Read the header
int offset = 0;
while (offset < HEADER_LENGTH) {
int read = in.read(header, offset, HEADER_LENGTH - offset);
while (offset < PACKET_HEADER_LENGTH) {
int read = in.read(header, offset, PACKET_HEADER_LENGTH - offset);
if (read == -1) {
if (offset > 0) throw new FormatException();
state = State.EOF;
@@ -86,7 +88,7 @@ class PacketReaderImpl implements PacketReader {
if (header[0] != PROTOCOL_VERSION) throw new FormatException();
// Read the payload length
payloadLength = ByteUtils.readUint16(header, 2);
if (payloadLength > MAX_PAYLOAD_LENGTH) throw new FormatException();
if (payloadLength > MAX_PACKET_PAYLOAD_LENGTH) throw new FormatException();
// Read the payload
offset = 0;
while (offset < payloadLength) {
@@ -99,7 +101,7 @@ class PacketReaderImpl implements PacketReader {
public boolean eof() throws IOException {
if (state == State.BUFFER_EMPTY) readPacket();
assert state != State.BUFFER_EMPTY;
if (state == State.BUFFER_EMPTY) throw new IllegalStateException();
return state == State.EOF;
}
@@ -109,44 +111,43 @@ class PacketReaderImpl implements PacketReader {
public Ack readAck() throws IOException {
if (!hasAck()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
BdfReader r = bdfReaderFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the message IDs
List<MessageId> acked = new ArrayList<MessageId>();
r.readListStart();
while (!r.hasListEnd()) {
byte[] b = r.readRaw(UniqueId.LENGTH);
if (b.length != UniqueId.LENGTH)
throw new FormatException();
acked.add(new MessageId(b));
return new Ack(Collections.unmodifiableList(readMessageIds()));
}
private List<MessageId> readMessageIds() throws IOException {
if (payloadLength == 0) throw new FormatException();
if (payloadLength % UniqueId.LENGTH != 0) throw new FormatException();
List<MessageId> ids = new ArrayList<MessageId>();
for (int off = 0; off < payloadLength; off += UniqueId.LENGTH) {
byte[] id = new byte[UniqueId.LENGTH];
System.arraycopy(payload, off, id, 0, UniqueId.LENGTH);
ids.add(new MessageId(id));
}
if (acked.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the ack
return new Ack(Collections.unmodifiableList(acked));
return ids;
}
public boolean hasMessage() throws IOException {
return !eof() && header[1] == MESSAGE;
}
public UnverifiedMessage readMessage() throws IOException {
public Message readMessage() throws IOException {
if (!hasMessage()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
BdfReader r = bdfReaderFactory.createReader(bais);
// Read and build the message
UnverifiedMessage m = messageReader.readObject(r);
if (!r.eof()) throw new FormatException();
if (payloadLength <= MESSAGE_HEADER_LENGTH) throw new FormatException();
// Group ID
byte[] id = new byte[UniqueId.LENGTH];
System.arraycopy(payload, 0, id, 0, UniqueId.LENGTH);
GroupId groupId = new GroupId(id);
// Timestamp
long timestamp = ByteUtils.readUint64(payload, UniqueId.LENGTH);
if (timestamp < 0) throw new FormatException();
// Raw message
byte[] raw = new byte[payloadLength];
System.arraycopy(payload, 0, raw, 0, payloadLength);
state = State.BUFFER_EMPTY;
return m;
// Message ID
MessageId messageId = new MessageId(crypto.hash(MessageId.LABEL, raw));
return new Message(messageId, groupId, timestamp, raw);
}
public boolean hasOffer() throws IOException {
@@ -155,28 +156,7 @@ class PacketReaderImpl implements PacketReader {
public Offer readOffer() throws IOException {
if (!hasOffer()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
BdfReader r = bdfReaderFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the message IDs
List<MessageId> offered = new ArrayList<MessageId>();
r.readListStart();
while (!r.hasListEnd()) {
byte[] b = r.readRaw(UniqueId.LENGTH);
if (b.length != UniqueId.LENGTH)
throw new FormatException();
offered.add(new MessageId(b));
}
if (offered.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the offer
return new Offer(Collections.unmodifiableList(offered));
return new Offer(Collections.unmodifiableList(readMessageIds()));
}
public boolean hasRequest() throws IOException {
@@ -185,28 +165,7 @@ class PacketReaderImpl implements PacketReader {
public Request readRequest() throws IOException {
if (!hasRequest()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
BdfReader r = bdfReaderFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the message IDs
r.readListStart();
List<MessageId> requested = new ArrayList<MessageId>();
while (!r.hasListEnd()) {
byte[] b = r.readRaw(UniqueId.LENGTH);
if (b.length != UniqueId.LENGTH)
throw new FormatException();
requested.add(new MessageId(b));
}
if (requested.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the request
return new Request(Collections.unmodifiableList(requested));
return new Request(Collections.unmodifiableList(readMessageIds()));
}
public boolean hasSubscriptionAck() throws IOException {

View File

@@ -1,5 +1,6 @@
package org.briarproject.sync;
import org.briarproject.api.UniqueId;
import org.briarproject.api.data.BdfWriter;
import org.briarproject.api.data.BdfWriterFactory;
import org.briarproject.api.sync.Ack;
@@ -19,12 +20,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import static org.briarproject.api.data.DataConstants.LIST_END_LENGTH;
import static org.briarproject.api.data.DataConstants.LIST_START_LENGTH;
import static org.briarproject.api.data.DataConstants.UNIQUE_ID_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.HEADER_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.PROTOCOL_VERSION;
import static org.briarproject.api.sync.PacketTypes.ACK;
import static org.briarproject.api.sync.PacketTypes.OFFER;
import static org.briarproject.api.sync.PacketTypes.REQUEST;
@@ -32,6 +27,9 @@ import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_ACK;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_UPDATE;
import static org.briarproject.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PACKET_HEADER_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PROTOCOL_VERSION;
// This class is not thread-safe
class PacketWriterImpl implements PacketWriter {
@@ -44,9 +42,9 @@ class PacketWriterImpl implements PacketWriter {
PacketWriterImpl(BdfWriterFactory bdfWriterFactory, OutputStream out) {
this.bdfWriterFactory = bdfWriterFactory;
this.out = out;
header = new byte[HEADER_LENGTH];
header = new byte[PACKET_HEADER_LENGTH];
header[0] = PROTOCOL_VERSION;
payload = new ByteArrayOutputStream(MAX_PAYLOAD_LENGTH);
payload = new ByteArrayOutputStream(MAX_PACKET_PAYLOAD_LENGTH);
}
public int getMaxMessagesForAck(long capacity) {
@@ -62,10 +60,9 @@ class PacketWriterImpl implements PacketWriter {
}
private int getMaxMessagesForPacket(long capacity) {
int payload = (int) Math.min(capacity - HEADER_LENGTH,
MAX_PAYLOAD_LENGTH);
int overhead = LIST_START_LENGTH * 2 + LIST_END_LENGTH * 2;
return (payload - overhead) / UNIQUE_ID_LENGTH;
int payload = (int) Math.min(capacity - PACKET_HEADER_LENGTH,
MAX_PACKET_PAYLOAD_LENGTH);
return payload / UniqueId.LENGTH;
}
private void writePacket(byte packetType) throws IOException {
@@ -77,13 +74,8 @@ class PacketWriterImpl implements PacketWriter {
}
public void writeAck(Ack a) throws IOException {
assert payload.size() == 0;
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeListStart();
for (MessageId m : a.getMessageIds()) w.writeRaw(m.getBytes());
w.writeListEnd();
w.writeListEnd();
if (payload.size() != 0) throw new IllegalStateException();
for (MessageId m : a.getMessageIds()) payload.write(m.getBytes());
writePacket(ACK);
}
@@ -95,29 +87,19 @@ class PacketWriterImpl implements PacketWriter {
}
public void writeOffer(Offer o) throws IOException {
assert payload.size() == 0;
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeListStart();
for (MessageId m : o.getMessageIds()) w.writeRaw(m.getBytes());
w.writeListEnd();
w.writeListEnd();
if (payload.size() != 0) throw new IllegalStateException();
for (MessageId m : o.getMessageIds()) payload.write(m.getBytes());
writePacket(OFFER);
}
public void writeRequest(Request r) throws IOException {
assert payload.size() == 0;
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeListStart();
for (MessageId m : r.getMessageIds()) w.writeRaw(m.getBytes());
w.writeListEnd();
w.writeListEnd();
if (payload.size() != 0) throw new IllegalStateException();
for (MessageId m : r.getMessageIds()) payload.write(m.getBytes());
writePacket(REQUEST);
}
public void writeSubscriptionAck(SubscriptionAck a) throws IOException {
assert payload.size() == 0;
if (payload.size() != 0) throw new IllegalStateException();
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeInteger(a.getVersion());
@@ -127,14 +109,14 @@ class PacketWriterImpl implements PacketWriter {
public void writeSubscriptionUpdate(SubscriptionUpdate u)
throws IOException {
assert payload.size() == 0;
if (payload.size() != 0) throw new IllegalStateException();
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeListStart();
for (Group g : u.getGroups()) {
w.writeListStart();
w.writeString(g.getName());
w.writeRaw(g.getSalt());
w.writeRaw(g.getClientId().getBytes());
w.writeRaw(g.getDescriptor());
w.writeListEnd();
}
w.writeListEnd();
@@ -144,7 +126,7 @@ class PacketWriterImpl implements PacketWriter {
}
public void writeTransportAck(TransportAck a) throws IOException {
assert payload.size() == 0;
if (payload.size() != 0) throw new IllegalStateException();
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeString(a.getId().getString());
@@ -154,7 +136,7 @@ class PacketWriterImpl implements PacketWriter {
}
public void writeTransportUpdate(TransportUpdate u) throws IOException {
assert payload.size() == 0;
if (payload.size() != 0) throw new IllegalStateException();
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeString(u.getId().getString());

View File

@@ -1,22 +0,0 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.Signature;
import org.briarproject.api.data.Consumer;
/** A consumer that passes its input through a signature. */
class SigningConsumer implements Consumer {
private final Signature signature;
public SigningConsumer(Signature signature) {
this.signature = signature;
}
public void write(byte b) {
signature.update(b);
}
public void write(byte[] b, int off, int len) {
signature.update(b, off, len);
}
}

View File

@@ -11,10 +11,10 @@ import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.ShutdownEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSession;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
@@ -28,15 +28,15 @@ import java.util.logging.Logger;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
/**
* An outgoing {@link org.briarproject.api.sync.MessagingSession
* MessagingSession} suitable for simplex transports. The session sends
* messages without offering them, and closes its output stream when there are
* no more packets to send.
* An outgoing {@link org.briarproject.api.sync.SyncSession SyncSession}
* suitable for simplex transports. The session sends messages without offering
* them first, and closes its output stream when there are no more packets to
* send.
*/
class SimplexOutgoingSession implements MessagingSession, EventListener {
class SimplexOutgoingSession implements SyncSession, EventListener {
private static final Logger LOG =
Logger.getLogger(SimplexOutgoingSession.class.getName());
@@ -163,7 +163,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
if (interrupted) return;
try {
Collection<byte[]> b = db.generateBatch(contactId,
MAX_PAYLOAD_LENGTH, maxLatency);
MAX_PACKET_PAYLOAD_LENGTH, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b == null) decrementOutstandingQueries();

View File

@@ -2,7 +2,6 @@ package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.Consumer;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupId;
@@ -15,8 +14,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_SUBSCRIPTIONS;
import static org.briarproject.api.sync.SyncConstants.MAX_SUBSCRIPTIONS;
class SubscriptionUpdateReader implements ObjectReader<SubscriptionUpdate> {
@@ -27,12 +25,7 @@ class SubscriptionUpdateReader implements ObjectReader<SubscriptionUpdate> {
}
public SubscriptionUpdate readObject(BdfReader r) throws IOException {
// Set up the reader
Consumer counting = new CountingConsumer(MAX_PAYLOAD_LENGTH);
r.addConsumer(counting);
// Read the start of the update
r.readListStart();
// Read the subscriptions, rejecting duplicates
List<Group> groups = new ArrayList<Group>();
Set<GroupId> ids = new HashSet<GroupId>();
r.readListStart();
@@ -42,14 +35,9 @@ class SubscriptionUpdateReader implements ObjectReader<SubscriptionUpdate> {
groups.add(g);
}
r.readListEnd();
// Read the version number
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the update
r.readListEnd();
// Reset the reader
r.removeConsumer(counting);
// Build and return the subscription update
groups = Collections.unmodifiableList(groups);
return new SubscriptionUpdate(groups, version);
}

View File

@@ -3,19 +3,18 @@ package org.briarproject.sync;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorFactory;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupFactory;
import org.briarproject.api.sync.MessageFactory;
import org.briarproject.api.sync.MessageVerifier;
import org.briarproject.api.sync.MessagingSessionFactory;
import org.briarproject.api.sync.PacketReaderFactory;
import org.briarproject.api.sync.PacketWriterFactory;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.UnverifiedMessage;
import org.briarproject.api.sync.SyncSessionFactory;
import org.briarproject.api.sync.ValidationManager;
import javax.inject.Singleton;
@@ -26,28 +25,20 @@ public class SyncModule extends AbstractModule {
bind(AuthorFactory.class).to(AuthorFactoryImpl.class);
bind(GroupFactory.class).to(GroupFactoryImpl.class);
bind(MessageFactory.class).to(MessageFactoryImpl.class);
bind(MessageVerifier.class).to(MessageVerifierImpl.class);
bind(PacketReaderFactory.class).to(PacketReaderFactoryImpl.class);
bind(PacketWriterFactory.class).to(PacketWriterFactoryImpl.class);
bind(MessagingSessionFactory.class).to(
MessagingSessionFactoryImpl.class).in(Singleton.class);
bind(SyncSessionFactory.class).to(
SyncSessionFactoryImpl.class).in(Singleton.class);
}
@Provides
ObjectReader<Author> getAuthorReader(CryptoComponent crypto) {
return new AuthorReader(crypto);
ObjectReader<Author> getAuthorReader(AuthorFactory authorFactory) {
return new AuthorReader(authorFactory);
}
@Provides
ObjectReader<Group> getGroupReader(CryptoComponent crypto) {
return new GroupReader(crypto);
}
@Provides
ObjectReader<UnverifiedMessage> getMessageReader(
ObjectReader<Group> groupReader,
ObjectReader<Author> authorReader) {
return new MessageReader(groupReader, authorReader);
ObjectReader<Group> getGroupReader(GroupFactory groupFactory) {
return new GroupReader(groupFactory);
}
@Provides
@@ -55,4 +46,11 @@ public class SyncModule extends AbstractModule {
ObjectReader<Group> groupReader) {
return new SubscriptionUpdateReader(groupReader);
}
@Provides @Singleton
ValidationManager getValidationManager(LifecycleManager lifecycleManager,
ValidationManagerImpl validationManager) {
lifecycleManager.register(validationManager);
return validationManager;
}
}

View File

@@ -2,17 +2,15 @@ package org.briarproject.sync;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.crypto.CryptoExecutor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.sync.MessageVerifier;
import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.MessagingSessionFactory;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.PacketReaderFactory;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.PacketWriterFactory;
import org.briarproject.api.sync.SyncSession;
import org.briarproject.api.sync.SyncSessionFactory;
import org.briarproject.api.system.Clock;
import java.io.InputStream;
@@ -21,49 +19,44 @@ import java.util.concurrent.Executor;
import javax.inject.Inject;
class MessagingSessionFactoryImpl implements MessagingSessionFactory {
class SyncSessionFactoryImpl implements SyncSessionFactory {
private final DatabaseComponent db;
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final Executor dbExecutor;
private final EventBus eventBus;
private final Clock clock;
private final PacketReaderFactory packetReaderFactory;
private final PacketWriterFactory packetWriterFactory;
@Inject
MessagingSessionFactoryImpl(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor,
MessageVerifier messageVerifier, EventBus eventBus, Clock clock,
PacketReaderFactory packetReaderFactory,
SyncSessionFactoryImpl(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor, EventBus eventBus,
Clock clock, PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory) {
this.db = db;
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.eventBus = eventBus;
this.clock = clock;
this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory;
}
public MessagingSession createIncomingSession(ContactId c, TransportId t,
public SyncSession createIncomingSession(ContactId c, TransportId t,
InputStream in) {
PacketReader packetReader = packetReaderFactory.createPacketReader(in);
return new IncomingSession(db, dbExecutor, cryptoExecutor, eventBus,
messageVerifier, c, t, packetReader);
return new IncomingSession(db, dbExecutor, eventBus, c, t,
packetReader);
}
public MessagingSession createSimplexOutgoingSession(ContactId c,
TransportId t, int maxLatency, OutputStream out) {
public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, OutputStream out) {
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
maxLatency, packetWriter);
}
public MessagingSession createDuplexOutgoingSession(ContactId c,
TransportId t, int maxLatency, int maxIdleTime, OutputStream out) {
public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, int maxIdleTime, OutputStream out) {
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t,
maxLatency, maxIdleTime, packetWriter);

View File

@@ -0,0 +1,162 @@
package org.briarproject.sync;
import com.google.inject.Inject;
import org.briarproject.api.UniqueId;
import org.briarproject.api.crypto.CryptoExecutor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.NoSuchMessageException;
import org.briarproject.api.db.NoSuchSubscriptionException;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.sync.ClientId;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.MessageValidator;
import org.briarproject.api.sync.ValidationManager;
import org.briarproject.util.ByteUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
class ValidationManagerImpl implements ValidationManager, EventListener {
private static final Logger LOG =
Logger.getLogger(ValidationManagerImpl.class.getName());
private final DatabaseComponent db;
private final Executor dbExecutor;
private final Executor cryptoExecutor;
private final EventBus eventBus;
private final Map<ClientId, MessageValidator> validators;
@Inject
ValidationManagerImpl(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor, EventBus eventBus) {
this.db = db;
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.eventBus = eventBus;
validators = new ConcurrentHashMap<ClientId, MessageValidator>();
}
@Override
public boolean start() {
eventBus.addListener(this);
return true;
}
@Override
public boolean stop() {
eventBus.removeListener(this);
return true;
}
@Override
public void setMessageValidator(ClientId c, MessageValidator v) {
validators.put(c, v);
getMessagesToValidate(c);
}
private void getMessagesToValidate(final ClientId c) {
dbExecutor.execute(new Runnable() {
public void run() {
try {
// TODO: Don't do all of this in a single DB task
for (MessageId id : db.getMessagesToValidate(c)) {
try {
Message m = parseMessage(id, db.getRawMessage(id));
validateMessage(m, c);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before validation");
}
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
private Message parseMessage(MessageId id, byte[] raw) {
if (raw.length <= MESSAGE_HEADER_LENGTH)
throw new IllegalArgumentException();
byte[] groupId = new byte[UniqueId.LENGTH];
System.arraycopy(raw, 0, groupId, 0, UniqueId.LENGTH);
long timestamp = ByteUtils.readUint64(raw, UniqueId.LENGTH);
return new Message(id, new GroupId(groupId), timestamp, raw);
}
private void validateMessage(final Message m, final ClientId c) {
cryptoExecutor.execute(new Runnable() {
public void run() {
MessageValidator v = validators.get(c);
if (v == null) {
LOG.warning("No validator");
} else {
Metadata meta = v.validateMessage(m);
storeValidationResult(m, c, meta);
}
}
});
}
private void storeValidationResult(final Message m, final ClientId c,
final Metadata meta) {
dbExecutor.execute(new Runnable() {
public void run() {
try {
if (meta == null) {
db.setMessageValidity(m, c, false);
} else {
db.mergeMessageMetadata(m.getId(), meta);
db.setMessageValidity(m, c, true);
}
} catch (NoSuchMessageException e) {
LOG.info("Message removed during validation");
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
@Override
public void eventOccurred(Event e) {
if (e instanceof MessageAddedEvent) {
MessageAddedEvent m = (MessageAddedEvent) e;
// Validate the message if it wasn't created locally
if (m.getContactId() != null) loadClientId(m.getMessage());
}
}
private void loadClientId(final Message m) {
dbExecutor.execute(new Runnable() {
public void run() {
try {
ClientId c = db.getGroup(m.getGroupId()).getClientId();
validateMessage(m, c);
} catch (NoSuchSubscriptionException e) {
LOG.info("Group removed before validation");
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
}