Changed the root package from net.sf.briar to org.briarproject.

This commit is contained in:
akwizgran
2014-01-08 16:18:30 +00:00
parent dce70f487c
commit 832476412c
427 changed files with 2507 additions and 2507 deletions

View File

@@ -0,0 +1,56 @@
package org.briarproject.messaging;
import static org.briarproject.api.messaging.Types.AUTHOR;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.inject.Inject;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorFactory;
import org.briarproject.api.AuthorId;
import org.briarproject.api.LocalAuthor;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.serial.Writer;
import org.briarproject.api.serial.WriterFactory;
class AuthorFactoryImpl implements AuthorFactory {
private final CryptoComponent crypto;
private final WriterFactory writerFactory;
@Inject
AuthorFactoryImpl(CryptoComponent crypto, WriterFactory writerFactory) {
this.crypto = crypto;
this.writerFactory = writerFactory;
}
public Author createAuthor(String name, byte[] publicKey) {
return new Author(getId(name, publicKey), name, publicKey);
}
public LocalAuthor createLocalAuthor(String name, byte[] publicKey,
byte[] privateKey) {
return new LocalAuthor(getId(name, publicKey), name, publicKey,
privateKey);
}
private AuthorId getId(String name, byte[] publicKey) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Writer w = writerFactory.createWriter(out);
try {
w.writeStructStart(AUTHOR);
w.writeString(name);
w.writeBytes(publicKey);
w.writeStructEnd();
} catch(IOException e) {
// Shouldn't happen with ByteArrayOutputStream
throw new RuntimeException();
}
MessageDigest messageDigest = crypto.getMessageDigest();
messageDigest.update(out.toByteArray());
return new AuthorId(messageDigest.digest());
}
}

View File

@@ -0,0 +1,40 @@
package org.briarproject.messaging;
import static org.briarproject.api.AuthorConstants.MAX_AUTHOR_NAME_LENGTH;
import static org.briarproject.api.AuthorConstants.MAX_PUBLIC_KEY_LENGTH;
import static org.briarproject.api.messaging.Types.AUTHOR;
import java.io.IOException;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorId;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.serial.DigestingConsumer;
import org.briarproject.api.serial.Reader;
import org.briarproject.api.serial.StructReader;
class AuthorReader implements StructReader<Author> {
private final MessageDigest messageDigest;
AuthorReader(CryptoComponent crypto) {
messageDigest = crypto.getMessageDigest();
}
public Author readStruct(Reader r) throws IOException {
// Set up the reader
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
r.addConsumer(digesting);
// Read and digest the data
r.readStructStart(AUTHOR);
String name = r.readString(MAX_AUTHOR_NAME_LENGTH);
byte[] publicKey = r.readBytes(MAX_PUBLIC_KEY_LENGTH);
r.readStructEnd();
// Reset the reader
r.removeConsumer(digesting);
// Build and return the author
AuthorId id = new AuthorId(messageDigest.digest());
return new Author(id, name, publicKey);
}
}

View File

@@ -0,0 +1,53 @@
package org.briarproject.messaging;
import static org.briarproject.api.messaging.MessagingConstants.GROUP_SALT_LENGTH;
import static org.briarproject.api.messaging.Types.GROUP;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.inject.Inject;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.GroupFactory;
import org.briarproject.api.messaging.GroupId;
import org.briarproject.api.serial.Writer;
import org.briarproject.api.serial.WriterFactory;
class GroupFactoryImpl implements GroupFactory {
private final CryptoComponent crypto;
private final WriterFactory writerFactory;
@Inject
GroupFactoryImpl(CryptoComponent crypto, WriterFactory writerFactory) {
this.crypto = crypto;
this.writerFactory = writerFactory;
}
public Group createGroup(String name) {
byte[] salt = new byte[GROUP_SALT_LENGTH];
crypto.getSecureRandom().nextBytes(salt);
return createGroup(name, salt);
}
public Group createGroup(String name, byte[] salt) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Writer w = writerFactory.createWriter(out);
try {
w.writeStructStart(GROUP);
w.writeString(name);
w.writeBytes(salt);
w.writeStructEnd();
} catch(IOException e) {
// Shouldn't happen with ByteArrayOutputStream
throw new RuntimeException();
}
MessageDigest messageDigest = crypto.getMessageDigest();
messageDigest.update(out.toByteArray());
GroupId id = new GroupId(messageDigest.digest());
return new Group(id, name, salt);
}
}

View File

@@ -0,0 +1,40 @@
package org.briarproject.messaging;
import static org.briarproject.api.AuthorConstants.MAX_PUBLIC_KEY_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_GROUP_NAME_LENGTH;
import static org.briarproject.api.messaging.Types.GROUP;
import java.io.IOException;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.GroupId;
import org.briarproject.api.serial.DigestingConsumer;
import org.briarproject.api.serial.Reader;
import org.briarproject.api.serial.StructReader;
class GroupReader implements StructReader<Group> {
private final MessageDigest messageDigest;
GroupReader(CryptoComponent crypto) {
messageDigest = crypto.getMessageDigest();
}
public Group readStruct(Reader r) throws IOException {
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
// Read and digest the data
r.addConsumer(digesting);
r.readStructStart(GROUP);
String name = r.readString(MAX_GROUP_NAME_LENGTH);
byte[] publicKey = null;
if(r.hasNull()) r.readNull();
else publicKey = r.readBytes(MAX_PUBLIC_KEY_LENGTH);
r.readStructEnd();
r.removeConsumer(digesting);
// Build and return the group
GroupId id = new GroupId(messageDigest.digest());
return new Group(id, name, publicKey);
}
}

View File

@@ -0,0 +1,134 @@
package org.briarproject.messaging;
import static org.briarproject.api.AuthorConstants.MAX_SIGNATURE_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_BODY_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_CONTENT_TYPE_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MESSAGE_SALT_LENGTH;
import static org.briarproject.api.messaging.Types.AUTHOR;
import static org.briarproject.api.messaging.Types.GROUP;
import static org.briarproject.api.messaging.Types.MESSAGE;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
import javax.inject.Inject;
import org.briarproject.api.Author;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.crypto.PrivateKey;
import org.briarproject.api.crypto.Signature;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageFactory;
import org.briarproject.api.messaging.MessageId;
import org.briarproject.api.serial.Consumer;
import org.briarproject.api.serial.CountingConsumer;
import org.briarproject.api.serial.DigestingConsumer;
import org.briarproject.api.serial.SigningConsumer;
import org.briarproject.api.serial.Writer;
import org.briarproject.api.serial.WriterFactory;
class MessageFactoryImpl implements MessageFactory {
private final Signature signature;
private final SecureRandom random;
private final MessageDigest messageDigest;
private final WriterFactory writerFactory;
@Inject
MessageFactoryImpl(CryptoComponent crypto, WriterFactory writerFactory) {
signature = crypto.getSignature();
random = crypto.getSecureRandom();
messageDigest = crypto.getMessageDigest();
this.writerFactory = writerFactory;
}
public Message createAnonymousMessage(MessageId parent, Group group,
String contentType, long timestamp, byte[] body) throws IOException,
GeneralSecurityException {
return createMessage(parent, group, null, null, contentType, timestamp,
body);
}
public Message createPseudonymousMessage(MessageId parent, Group group,
Author author, PrivateKey privateKey, String contentType,
long timestamp, byte[] body) throws IOException,
GeneralSecurityException {
return createMessage(parent, group, author, privateKey, contentType,
timestamp, body);
}
private Message createMessage(MessageId parent, Group group, Author author,
PrivateKey privateKey, String contentType, long timestamp,
byte[] body) throws IOException, GeneralSecurityException {
// Validate the arguments
if((author == null) != (privateKey == null))
throw new IllegalArgumentException();
if(contentType.getBytes("UTF-8").length > MAX_CONTENT_TYPE_LENGTH)
throw new IllegalArgumentException();
if(body.length > MAX_BODY_LENGTH)
throw new IllegalArgumentException();
// Serialise the message to a buffer
ByteArrayOutputStream out = new ByteArrayOutputStream();
Writer w = writerFactory.createWriter(out);
// Initialise the consumers
CountingConsumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
w.addConsumer(counting);
Consumer digestingConsumer = new DigestingConsumer(messageDigest);
w.addConsumer(digestingConsumer);
Consumer signingConsumer = null;
if(privateKey != null) {
signature.initSign(privateKey);
signingConsumer = new SigningConsumer(signature);
w.addConsumer(signingConsumer);
}
// Write the message
w.writeStructStart(MESSAGE);
if(parent == null) w.writeNull();
else w.writeBytes(parent.getBytes());
writeGroup(w, group);
if(author == null) w.writeNull();
else writeAuthor(w, author);
w.writeString(contentType);
w.writeIntAny(timestamp);
byte[] salt = new byte[MESSAGE_SALT_LENGTH];
random.nextBytes(salt);
w.writeBytes(salt);
w.writeBytes(body);
int bodyStart = (int) counting.getCount() - body.length;
// Sign the message with the author's private key, if there is one
if(privateKey == null) {
w.writeNull();
} else {
w.removeConsumer(signingConsumer);
byte[] sig = signature.sign();
if(sig.length > MAX_SIGNATURE_LENGTH)
throw new IllegalArgumentException();
w.writeBytes(sig);
}
w.writeStructEnd();
// Hash the message, including the signature, to get the message ID
w.removeConsumer(digestingConsumer);
MessageId id = new MessageId(messageDigest.digest());
return new MessageImpl(id, parent, group, author, contentType,
timestamp, out.toByteArray(), bodyStart, body.length);
}
private void writeGroup(Writer w, Group g) throws IOException {
w.writeStructStart(GROUP);
w.writeString(g.getName());
w.writeBytes(g.getSalt());
w.writeStructEnd();
}
private void writeAuthor(Writer w, Author a) throws IOException {
w.writeStructStart(AUTHOR);
w.writeString(a.getName());
w.writeBytes(a.getPublicKey());
w.writeStructEnd();
}
}

View File

@@ -0,0 +1,83 @@
package org.briarproject.messaging;
import static org.briarproject.api.messaging.MessagingConstants.MAX_BODY_LENGTH;
import org.briarproject.api.Author;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageId;
/** A simple in-memory implementation of a message. */
class MessageImpl implements Message {
private final MessageId id, parent;
private final Group group;
private final Author author;
private final String contentType;
private final long timestamp;
private final byte[] raw;
private final int bodyStart, bodyLength;
public MessageImpl(MessageId id, MessageId parent, Group group,
Author author, String contentType, long timestamp,
byte[] raw, int bodyStart, int bodyLength) {
if(bodyStart + bodyLength > raw.length)
throw new IllegalArgumentException();
if(bodyLength > MAX_BODY_LENGTH)
throw new IllegalArgumentException();
this.id = id;
this.parent = parent;
this.group = group;
this.author = author;
this.contentType = contentType;
this.timestamp = timestamp;
this.raw = raw;
this.bodyStart = bodyStart;
this.bodyLength = bodyLength;
}
public MessageId getId() {
return id;
}
public MessageId getParent() {
return parent;
}
public Group getGroup() {
return group;
}
public Author getAuthor() {
return author;
}
public String getContentType() {
return contentType;
}
public long getTimestamp() {
return timestamp;
}
public byte[] getSerialised() {
return raw;
}
public int getBodyStart() {
return bodyStart;
}
public int getBodyLength() {
return bodyLength;
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public boolean equals(Object o) {
return o instanceof Message && id.equals(((Message) o).getId());
}
}

View File

@@ -0,0 +1,84 @@
package org.briarproject.messaging;
import static org.briarproject.api.AuthorConstants.MAX_SIGNATURE_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_BODY_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_CONTENT_TYPE_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MESSAGE_SALT_LENGTH;
import static org.briarproject.api.messaging.Types.MESSAGE;
import java.io.IOException;
import org.briarproject.api.Author;
import org.briarproject.api.FormatException;
import org.briarproject.api.UniqueId;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.MessageId;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.serial.CopyingConsumer;
import org.briarproject.api.serial.CountingConsumer;
import org.briarproject.api.serial.Reader;
import org.briarproject.api.serial.StructReader;
class MessageReader implements StructReader<UnverifiedMessage> {
private final StructReader<Group> groupReader;
private final StructReader<Author> authorReader;
MessageReader(StructReader<Group> groupReader,
StructReader<Author> authorReader) {
this.groupReader = groupReader;
this.authorReader = authorReader;
}
public UnverifiedMessage readStruct(Reader r) throws IOException {
CopyingConsumer copying = new CopyingConsumer();
CountingConsumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
r.addConsumer(copying);
r.addConsumer(counting);
// Read the start of the struct
r.readStructStart(MESSAGE);
// Read the parent's message ID, if there is one
MessageId parent = null;
if(r.hasNull()) {
r.readNull();
} else {
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length < UniqueId.LENGTH) throw new FormatException();
parent = new MessageId(b);
}
// Read the group
Group group = groupReader.readStruct(r);
// Read the author, if there is one
Author author = null;
if(r.hasNull()) r.readNull();
else author = authorReader.readStruct(r);
// Read the content type
String contentType = r.readString(MAX_CONTENT_TYPE_LENGTH);
// Read the timestamp
long timestamp = r.readIntAny();
if(timestamp < 0) throw new FormatException();
// Read the salt
byte[] salt = r.readBytes(MESSAGE_SALT_LENGTH);
if(salt.length < MESSAGE_SALT_LENGTH) throw new FormatException();
// Read the message body
byte[] body = r.readBytes(MAX_BODY_LENGTH);
// Record the offset of the body within the message
int bodyStart = (int) counting.getCount() - body.length;
// Record the length of the data covered by the author's signature
int signedLength = (int) counting.getCount();
// Read the author's signature, if there is one
byte[] signature = null;
if(author == null) r.readNull();
else signature = r.readBytes(MAX_SIGNATURE_LENGTH);
// Read the end of the struct
r.readStructEnd();
// The signature will be verified later
r.removeConsumer(counting);
r.removeConsumer(copying);
byte[] raw = copying.getCopy();
return new UnverifiedMessage(parent, group, author, contentType,
timestamp, raw, signature, bodyStart, body.length,
signedLength);
}
}

View File

@@ -0,0 +1,59 @@
package org.briarproject.messaging;
import static org.briarproject.api.transport.TransportConstants.MAX_CLOCK_DIFFERENCE;
import java.security.GeneralSecurityException;
import javax.inject.Inject;
import org.briarproject.api.Author;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.KeyParser;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.crypto.PublicKey;
import org.briarproject.api.crypto.Signature;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageId;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.system.Clock;
class MessageVerifierImpl implements MessageVerifier {
private final CryptoComponent crypto;
private final Clock clock;
private final KeyParser keyParser;
@Inject
MessageVerifierImpl(CryptoComponent crypto, Clock clock) {
this.crypto = crypto;
this.clock = clock;
keyParser = crypto.getSignatureKeyParser();
}
public Message verifyMessage(UnverifiedMessage m)
throws GeneralSecurityException {
MessageDigest messageDigest = crypto.getMessageDigest();
Signature signature = crypto.getSignature();
// Reject the message if it's too far in the future
long now = clock.currentTimeMillis();
if(m.getTimestamp() > now + MAX_CLOCK_DIFFERENCE)
throw new GeneralSecurityException();
// Hash the message to get the message ID
byte[] raw = m.getSerialised();
messageDigest.update(raw);
MessageId id = new MessageId(messageDigest.digest());
// Verify the author's signature, if there is one
Author author = m.getAuthor();
if(author != null) {
PublicKey k = keyParser.parsePublicKey(author.getPublicKey());
signature.initVerify(k);
signature.update(raw, 0, m.getSignedLength());
if(!signature.verify(m.getSignature()))
throw new GeneralSecurityException();
}
return new MessageImpl(id, m.getParent(), m.getGroup(), author,
m.getContentType(), m.getTimestamp(), raw, m.getBodyStart(),
m.getBodyLength());
}
}

View File

@@ -0,0 +1,52 @@
package org.briarproject.messaging;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorFactory;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.GroupFactory;
import org.briarproject.api.messaging.MessageFactory;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.serial.StructReader;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class MessagingModule extends AbstractModule {
protected void configure() {
bind(AuthorFactory.class).to(AuthorFactoryImpl.class);
bind(GroupFactory.class).to(GroupFactoryImpl.class);
bind(MessageFactory.class).to(MessageFactoryImpl.class);
bind(MessageVerifier.class).to(MessageVerifierImpl.class);
bind(PacketReaderFactory.class).to(PacketReaderFactoryImpl.class);
bind(PacketWriterFactory.class).to(PacketWriterFactoryImpl.class);
}
@Provides
StructReader<Author> getAuthorReader(CryptoComponent crypto) {
return new AuthorReader(crypto);
}
@Provides
StructReader<Group> getGroupReader(CryptoComponent crypto) {
return new GroupReader(crypto);
}
@Provides
StructReader<UnverifiedMessage> getMessageReader(
StructReader<Group> groupReader,
StructReader<Author> authorReader) {
return new MessageReader(groupReader, authorReader);
}
@Provides
StructReader<SubscriptionUpdate> getSubscriptionUpdateReader(
StructReader<Group> groupReader) {
return new SubscriptionUpdateReader(groupReader);
}
}

View File

@@ -0,0 +1,33 @@
package org.briarproject.messaging;
import java.io.InputStream;
import javax.inject.Inject;
import org.briarproject.api.messaging.PacketReader;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.serial.ReaderFactory;
import org.briarproject.api.serial.StructReader;
class PacketReaderFactoryImpl implements PacketReaderFactory {
private final ReaderFactory readerFactory;
private final StructReader<UnverifiedMessage> messageReader;
private final StructReader<SubscriptionUpdate> subscriptionUpdateReader;
@Inject
PacketReaderFactoryImpl(ReaderFactory readerFactory,
StructReader<UnverifiedMessage> messageReader,
StructReader<SubscriptionUpdate> subscriptionUpdateReader) {
this.readerFactory = readerFactory;
this.messageReader = messageReader;
this.subscriptionUpdateReader = subscriptionUpdateReader;
}
public PacketReader createPacketReader(InputStream in) {
return new PacketReaderImpl(readerFactory, messageReader,
subscriptionUpdateReader, in);
}
}

View File

@@ -0,0 +1,257 @@
package org.briarproject.messaging;
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTIES_PER_TRANSPORT;
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTY_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import static org.briarproject.api.messaging.Types.ACK;
import static org.briarproject.api.messaging.Types.MESSAGE;
import static org.briarproject.api.messaging.Types.OFFER;
import static org.briarproject.api.messaging.Types.REQUEST;
import static org.briarproject.api.messaging.Types.RETENTION_ACK;
import static org.briarproject.api.messaging.Types.RETENTION_UPDATE;
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_ACK;
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.messaging.Types.TRANSPORT_ACK;
import static org.briarproject.api.messaging.Types.TRANSPORT_UPDATE;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.UniqueId;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.MessageId;
import org.briarproject.api.messaging.Offer;
import org.briarproject.api.messaging.PacketReader;
import org.briarproject.api.messaging.Request;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.serial.Consumer;
import org.briarproject.api.serial.CountingConsumer;
import org.briarproject.api.serial.Reader;
import org.briarproject.api.serial.ReaderFactory;
import org.briarproject.api.serial.StructReader;
// This class is not thread-safe
class PacketReaderImpl implements PacketReader {
private final StructReader<UnverifiedMessage> messageReader;
private final StructReader<SubscriptionUpdate> subscriptionUpdateReader;
private final Reader r;
PacketReaderImpl(ReaderFactory readerFactory,
StructReader<UnverifiedMessage> messageReader,
StructReader<SubscriptionUpdate> subscriptionUpdateReader,
InputStream in) {
this.messageReader = messageReader;
this.subscriptionUpdateReader = subscriptionUpdateReader;
r = readerFactory.createReader(in);
}
public boolean eof() throws IOException {
return r.eof();
}
public boolean hasAck() throws IOException {
return r.hasStruct(ACK);
}
public Ack readAck() throws IOException {
// Set up the reader
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
r.addConsumer(counting);
// Read the start of the struct
r.readStructStart(ACK);
// Read the message IDs
List<MessageId> acked = new ArrayList<MessageId>();
r.readListStart();
while(!r.hasListEnd()) {
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length != UniqueId.LENGTH)
throw new FormatException();
acked.add(new MessageId(b));
}
if(acked.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the struct
r.readStructEnd();
// Reset the reader
r.removeConsumer(counting);
// Build and return the ack
return new Ack(Collections.unmodifiableList(acked));
}
public boolean hasMessage() throws IOException {
return r.hasStruct(MESSAGE);
}
public UnverifiedMessage readMessage() throws IOException {
return messageReader.readStruct(r);
}
public boolean hasOffer() throws IOException {
return r.hasStruct(OFFER);
}
public Offer readOffer() throws IOException {
// Set up the reader
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
r.addConsumer(counting);
// Read the start of the struct
r.readStructStart(OFFER);
// Read the message IDs
List<MessageId> offered = new ArrayList<MessageId>();
r.readListStart();
while(!r.hasListEnd()) {
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length != UniqueId.LENGTH)
throw new FormatException();
offered.add(new MessageId(b));
}
if(offered.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the struct
r.readStructEnd();
// Reset the reader
r.removeConsumer(counting);
// Build and return the offer
return new Offer(Collections.unmodifiableList(offered));
}
public boolean hasRequest() throws IOException {
return r.hasStruct(REQUEST);
}
public Request readRequest() throws IOException {
// Set up the reader
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
r.addConsumer(counting);
// Read the start of the struct
r.readStructStart(REQUEST);
// Read the message IDs
List<MessageId> requested = new ArrayList<MessageId>();
r.readListStart();
while(!r.hasListEnd()) {
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length != UniqueId.LENGTH)
throw new FormatException();
requested.add(new MessageId(b));
}
if(requested.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the struct
r.readStructEnd();
// Reset the reader
r.removeConsumer(counting);
// Build and return the request
return new Request(Collections.unmodifiableList(requested));
}
public boolean hasRetentionAck() throws IOException {
return r.hasStruct(RETENTION_ACK);
}
public RetentionAck readRetentionAck() throws IOException {
r.readStructStart(RETENTION_ACK);
long version = r.readIntAny();
if(version < 0) throw new FormatException();
r.readStructEnd();
return new RetentionAck(version);
}
public boolean hasRetentionUpdate() throws IOException {
return r.hasStruct(RETENTION_UPDATE);
}
public RetentionUpdate readRetentionUpdate() throws IOException {
r.readStructStart(RETENTION_UPDATE);
long retention = r.readIntAny();
if(retention < 0) throw new FormatException();
long version = r.readIntAny();
if(version < 0) throw new FormatException();
r.readStructEnd();
return new RetentionUpdate(retention, version);
}
public boolean hasSubscriptionAck() throws IOException {
return r.hasStruct(SUBSCRIPTION_ACK);
}
public SubscriptionAck readSubscriptionAck() throws IOException {
r.readStructStart(SUBSCRIPTION_ACK);
long version = r.readIntAny();
if(version < 0) throw new FormatException();
r.readStructEnd();
return new SubscriptionAck(version);
}
public boolean hasSubscriptionUpdate() throws IOException {
return r.hasStruct(SUBSCRIPTION_UPDATE);
}
public SubscriptionUpdate readSubscriptionUpdate() throws IOException {
return subscriptionUpdateReader.readStruct(r);
}
public boolean hasTransportAck() throws IOException {
return r.hasStruct(TRANSPORT_ACK);
}
public TransportAck readTransportAck() throws IOException {
r.readStructStart(TRANSPORT_ACK);
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length < UniqueId.LENGTH) throw new FormatException();
long version = r.readIntAny();
if(version < 0) throw new FormatException();
r.readStructEnd();
return new TransportAck(new TransportId(b), version);
}
public boolean hasTransportUpdate() throws IOException {
return r.hasStruct(TRANSPORT_UPDATE);
}
public TransportUpdate readTransportUpdate() throws IOException {
// Set up the reader
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
r.addConsumer(counting);
// Read the start of the struct
r.readStructStart(TRANSPORT_UPDATE);
// Read the transport ID
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length < UniqueId.LENGTH) throw new FormatException();
TransportId id = new TransportId(b);
// Read the transport properties
Map<String, String> p = new HashMap<String, String>();
r.readMapStart();
for(int i = 0; !r.hasMapEnd(); i++) {
if(i == MAX_PROPERTIES_PER_TRANSPORT)
throw new FormatException();
String key = r.readString(MAX_PROPERTY_LENGTH);
String value = r.readString(MAX_PROPERTY_LENGTH);
p.put(key, value);
}
r.readMapEnd();
// Read the version number
long version = r.readIntAny();
if(version < 0) throw new FormatException();
// Read the end of the struct
r.readStructEnd();
// Reset the reader
r.removeConsumer(counting);
// Build and return the transport update
return new TransportUpdate(id, new TransportProperties(p), version);
}
}

View File

@@ -0,0 +1,28 @@
package org.briarproject.messaging;
import java.io.OutputStream;
import javax.inject.Inject;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.serial.SerialComponent;
import org.briarproject.api.serial.WriterFactory;
class PacketWriterFactoryImpl implements PacketWriterFactory {
private final SerialComponent serial;
private final WriterFactory writerFactory;
@Inject
PacketWriterFactoryImpl(SerialComponent serial,
WriterFactory writerFactory) {
this.serial = serial;
this.writerFactory = writerFactory;
}
public PacketWriter createPacketWriter(OutputStream out,
boolean flush) {
return new PacketWriterImpl(serial, writerFactory, out, flush);
}
}

View File

@@ -0,0 +1,164 @@
package org.briarproject.messaging;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import static org.briarproject.api.messaging.Types.ACK;
import static org.briarproject.api.messaging.Types.GROUP;
import static org.briarproject.api.messaging.Types.OFFER;
import static org.briarproject.api.messaging.Types.REQUEST;
import static org.briarproject.api.messaging.Types.RETENTION_ACK;
import static org.briarproject.api.messaging.Types.RETENTION_UPDATE;
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_ACK;
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.messaging.Types.TRANSPORT_ACK;
import static org.briarproject.api.messaging.Types.TRANSPORT_UPDATE;
import java.io.IOException;
import java.io.OutputStream;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.MessageId;
import org.briarproject.api.messaging.Offer;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.Request;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.serial.SerialComponent;
import org.briarproject.api.serial.Writer;
import org.briarproject.api.serial.WriterFactory;
// This class is not thread-safe
class PacketWriterImpl implements PacketWriter {
private final SerialComponent serial;
private final OutputStream out;
private final boolean flush;
private final Writer w;
PacketWriterImpl(SerialComponent serial, WriterFactory writerFactory,
OutputStream out, boolean flush) {
this.serial = serial;
this.out = out;
this.flush = flush;
w = writerFactory.createWriter(out);
}
public int getMaxMessagesForRequest(long capacity) {
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
int overhead = serial.getSerialisedStructStartLength(ACK)
+ serial.getSerialisedListStartLength()
+ serial.getSerialisedListEndLength()
+ serial.getSerialisedStructEndLength();
int idLength = serial.getSerialisedUniqueIdLength();
return (packet - overhead) / idLength;
}
public int getMaxMessagesForOffer(long capacity) {
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
int overhead = serial.getSerialisedStructStartLength(OFFER)
+ serial.getSerialisedListStartLength()
+ serial.getSerialisedListEndLength()
+ serial.getSerialisedStructEndLength();
int idLength = serial.getSerialisedUniqueIdLength();
return (packet - overhead) / idLength;
}
public void writeAck(Ack a) throws IOException {
w.writeStructStart(ACK);
w.writeListStart();
for(MessageId m : a.getMessageIds()) w.writeBytes(m.getBytes());
w.writeListEnd();
w.writeStructEnd();
if(flush) out.flush();
}
public void writeMessage(byte[] raw) throws IOException {
out.write(raw);
if(flush) out.flush();
}
public void writeOffer(Offer o) throws IOException {
w.writeStructStart(OFFER);
w.writeListStart();
for(MessageId m : o.getMessageIds()) w.writeBytes(m.getBytes());
w.writeListEnd();
w.writeStructEnd();
if(flush) out.flush();
}
public void writeRequest(Request r) throws IOException {
w.writeStructStart(REQUEST);
w.writeListStart();
for(MessageId m : r.getMessageIds()) w.writeBytes(m.getBytes());
w.writeListEnd();
w.writeStructEnd();
if(flush) out.flush();
}
public void writeRetentionAck(RetentionAck a) throws IOException {
w.writeStructStart(RETENTION_ACK);
w.writeIntAny(a.getVersion());
w.writeStructEnd();
if(flush) out.flush();
}
public void writeRetentionUpdate(RetentionUpdate u) throws IOException {
w.writeStructStart(RETENTION_UPDATE);
w.writeIntAny(u.getRetentionTime());
w.writeIntAny(u.getVersion());
w.writeStructEnd();
if(flush) out.flush();
}
public void writeSubscriptionAck(SubscriptionAck a) throws IOException {
w.writeStructStart(SUBSCRIPTION_ACK);
w.writeIntAny(a.getVersion());
w.writeStructEnd();
if(flush) out.flush();
}
public void writeSubscriptionUpdate(SubscriptionUpdate u)
throws IOException {
w.writeStructStart(SUBSCRIPTION_UPDATE);
w.writeListStart();
for(Group g : u.getGroups()) {
w.writeStructStart(GROUP);
w.writeString(g.getName());
w.writeBytes(g.getSalt());
w.writeStructEnd();
}
w.writeListEnd();
w.writeIntAny(u.getVersion());
w.writeStructEnd();
if(flush) out.flush();
}
public void writeTransportAck(TransportAck a) throws IOException {
w.writeStructStart(TRANSPORT_ACK);
w.writeBytes(a.getId().getBytes());
w.writeIntAny(a.getVersion());
w.writeStructEnd();
if(flush) out.flush();
}
public void writeTransportUpdate(TransportUpdate u) throws IOException {
w.writeStructStart(TRANSPORT_UPDATE);
w.writeBytes(u.getId().getBytes());
w.writeMap(u.getProperties());
w.writeIntAny(u.getVersion());
w.writeStructEnd();
if(flush) out.flush();
}
public void flush() throws IOException {
out.flush();
}
public void close() throws IOException {
out.close();
}
}

View File

@@ -0,0 +1,51 @@
package org.briarproject.messaging;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_SUBSCRIPTIONS;
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_UPDATE;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.briarproject.api.FormatException;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.serial.Consumer;
import org.briarproject.api.serial.CountingConsumer;
import org.briarproject.api.serial.Reader;
import org.briarproject.api.serial.StructReader;
class SubscriptionUpdateReader implements StructReader<SubscriptionUpdate> {
private final StructReader<Group> groupReader;
SubscriptionUpdateReader(StructReader<Group> groupReader) {
this.groupReader = groupReader;
}
public SubscriptionUpdate readStruct(Reader r) throws IOException {
// Set up the reader
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
r.addConsumer(counting);
// Read the start of the struct
r.readStructStart(SUBSCRIPTION_UPDATE);
// Read the subscriptions
List<Group> groups = new ArrayList<Group>();
r.readListStart();
for(int i = 0; i < MAX_SUBSCRIPTIONS && !r.hasListEnd(); i++)
groups.add(groupReader.readStruct(r));
r.readListEnd();
// Read the version number
long version = r.readIntAny();
if(version < 0) throw new FormatException();
// Read the end of the struct
r.readStructEnd();
// Reset the reader
r.removeConsumer(counting);
// Build and return the subscription update
groups = Collections.unmodifiableList(groups);
return new SubscriptionUpdate(groups, version);
}
}

View File

@@ -0,0 +1,879 @@
package org.briarproject.messaging.duplex;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.GeneralSecurityException;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageExpiredEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.RemoteRetentionTimeUpdatedEvent;
import org.briarproject.api.event.RemoteSubscriptionsUpdatedEvent;
import org.briarproject.api.event.RemoteTransportsUpdatedEvent;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.Offer;
import org.briarproject.api.messaging.PacketReader;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.Request;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionReader;
import org.briarproject.api.transport.ConnectionReaderFactory;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.ConnectionWriter;
import org.briarproject.api.transport.ConnectionWriterFactory;
import org.briarproject.util.ByteUtils;
abstract class DuplexConnection implements EventListener {
private static final Logger LOG =
Logger.getLogger(DuplexConnection.class.getName());
private static final Runnable CLOSE = new Runnable() {
public void run() {}
};
private static final Runnable DIE = new Runnable() {
public void run() {}
};
protected final DatabaseComponent db;
protected final ConnectionRegistry connRegistry;
protected final ConnectionReaderFactory connReaderFactory;
protected final ConnectionWriterFactory connWriterFactory;
protected final PacketReaderFactory packetReaderFactory;
protected final PacketWriterFactory packetWriterFactory;
protected final ConnectionContext ctx;
protected final DuplexTransportConnection transport;
protected final ContactId contactId;
protected final TransportId transportId;
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final long maxLatency;
private final AtomicBoolean disposed;
private final BlockingQueue<Runnable> writerTasks;
private volatile PacketWriter writer = null;
DuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory, ConnectionContext ctx,
DuplexTransportConnection transport) {
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory;
this.ctx = ctx;
this.transport = transport;
contactId = ctx.getContactId();
transportId = ctx.getTransportId();
maxLatency = transport.getMaxLatency();
disposed = new AtomicBoolean(false);
writerTasks = new LinkedBlockingQueue<Runnable>();
}
protected abstract ConnectionReader createConnectionReader()
throws IOException;
protected abstract ConnectionWriter createConnectionWriter()
throws IOException;
public void eventOccurred(Event e) {
if(e instanceof ContactRemovedEvent) {
ContactRemovedEvent c = (ContactRemovedEvent) e;
if(contactId.equals(c.getContactId())) writerTasks.add(CLOSE);
} else if(e instanceof MessageAddedEvent) {
dbExecutor.execute(new GenerateOffer());
} else if(e instanceof MessageExpiredEvent) {
dbExecutor.execute(new GenerateRetentionUpdate());
} else if(e instanceof LocalSubscriptionsUpdatedEvent) {
LocalSubscriptionsUpdatedEvent l =
(LocalSubscriptionsUpdatedEvent) e;
if(l.getAffectedContacts().contains(contactId)) {
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateOffer());
}
} else if(e instanceof LocalTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportUpdates());
} else if(e instanceof MessageRequestedEvent) {
if(((MessageRequestedEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateBatch());
} else if(e instanceof MessageToAckEvent) {
if(((MessageToAckEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateAck());
} else if(e instanceof MessageToRequestEvent) {
if(((MessageToRequestEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateRequest());
} else if(e instanceof RemoteRetentionTimeUpdatedEvent) {
dbExecutor.execute(new GenerateRetentionAck());
} else if(e instanceof RemoteSubscriptionsUpdatedEvent) {
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateOffer());
} else if(e instanceof RemoteTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportAcks());
}
}
void read() {
try {
InputStream in = createConnectionReader().getInputStream();
PacketReader reader = packetReaderFactory.createPacketReader(in);
if(LOG.isLoggable(INFO)) LOG.info("Starting to read");
while(!reader.eof()) {
if(reader.hasAck()) {
Ack a = reader.readAck();
if(LOG.isLoggable(INFO)) LOG.info("Received ack");
dbExecutor.execute(new ReceiveAck(a));
} else if(reader.hasMessage()) {
UnverifiedMessage m = reader.readMessage();
if(LOG.isLoggable(INFO)) LOG.info("Received message");
cryptoExecutor.execute(new VerifyMessage(m));
} else if(reader.hasOffer()) {
Offer o = reader.readOffer();
if(LOG.isLoggable(INFO)) LOG.info("Received offer");
dbExecutor.execute(new ReceiveOffer(o));
} else if(reader.hasRequest()) {
Request r = reader.readRequest();
if(LOG.isLoggable(INFO)) LOG.info("Received request");
dbExecutor.execute(new ReceiveRequest(r));
} else if(reader.hasRetentionAck()) {
RetentionAck a = reader.readRetentionAck();
if(LOG.isLoggable(INFO)) LOG.info("Received retention ack");
dbExecutor.execute(new ReceiveRetentionAck(a));
} else if(reader.hasRetentionUpdate()) {
RetentionUpdate u = reader.readRetentionUpdate();
if(LOG.isLoggable(INFO))
LOG.info("Received retention update");
dbExecutor.execute(new ReceiveRetentionUpdate(u));
} else if(reader.hasSubscriptionAck()) {
SubscriptionAck a = reader.readSubscriptionAck();
if(LOG.isLoggable(INFO))
LOG.info("Received subscription ack");
dbExecutor.execute(new ReceiveSubscriptionAck(a));
} else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = reader.readSubscriptionUpdate();
if(LOG.isLoggable(INFO))
LOG.info("Received subscription update");
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if(reader.hasTransportAck()) {
TransportAck a = reader.readTransportAck();
if(LOG.isLoggable(INFO))
LOG.info("Received transport ack");
dbExecutor.execute(new ReceiveTransportAck(a));
} else if(reader.hasTransportUpdate()) {
TransportUpdate u = reader.readTransportUpdate();
if(LOG.isLoggable(INFO))
LOG.info("Received transport update");
dbExecutor.execute(new ReceiveTransportUpdate(u));
} else {
throw new FormatException();
}
}
if(LOG.isLoggable(INFO)) LOG.info("Finished reading");
writerTasks.add(CLOSE);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
writerTasks.add(DIE);
}
}
void write() {
connRegistry.registerConnection(contactId, transportId);
db.addListener(this);
try {
OutputStream out = createConnectionWriter().getOutputStream();
writer = packetWriterFactory.createPacketWriter(out,
transport.shouldFlush());
if(LOG.isLoggable(INFO)) LOG.info("Starting to write");
// Send the initial packets
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionAck());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
dbExecutor.execute(new GenerateRequest());
// Main loop
Runnable task = null;
while(true) {
if(LOG.isLoggable(INFO))
LOG.info("Waiting for something to write");
task = writerTasks.take();
if(task == CLOSE || task == DIE) break;
task.run();
}
if(LOG.isLoggable(INFO)) LOG.info("Finished writing");
if(task == CLOSE) {
writer.flush();
writer.close();
dispose(false, true);
} else {
dispose(true, true);
}
} catch(InterruptedException e) {
if(LOG.isLoggable(INFO))
LOG.info("Interrupted while waiting for task");
dispose(true, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
db.removeListener(this);
connRegistry.unregisterConnection(contactId, transportId);
}
private void dispose(boolean exception, boolean recognised) {
if(disposed.getAndSet(true)) return;
if(LOG.isLoggable(INFO))
LOG.info("Disposing: " + exception + ", " + recognised);
ByteUtils.erase(ctx.getSecret());
try {
transport.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
// This task runs on a database thread
private class ReceiveAck implements Runnable {
private final Ack ack;
private ReceiveAck(Ack ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveAck(contactId, ack);
if(LOG.isLoggable(INFO)) LOG.info("DB received ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a crypto thread
private class VerifyMessage implements Runnable {
private final UnverifiedMessage message;
private VerifyMessage(UnverifiedMessage message) {
this.message = message;
}
public void run() {
try {
Message m = messageVerifier.verifyMessage(message);
if(LOG.isLoggable(INFO)) LOG.info("Verified message");
dbExecutor.execute(new ReceiveMessage(m));
} catch(GeneralSecurityException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveMessage implements Runnable {
private final Message message;
private ReceiveMessage(Message message) {
this.message = message;
}
public void run() {
try {
db.receiveMessage(contactId, message);
if(LOG.isLoggable(INFO)) LOG.info("DB received message");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveOffer implements Runnable {
private final Offer offer;
private ReceiveOffer(Offer offer) {
this.offer = offer;
}
public void run() {
try {
db.receiveOffer(contactId, offer);
if(LOG.isLoggable(INFO)) LOG.info("DB received offer");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveRequest implements Runnable {
private final Request request;
private ReceiveRequest(Request request) {
this.request = request;
}
public void run() {
try {
db.receiveRequest(contactId, request);
if(LOG.isLoggable(INFO)) LOG.info("DB received request");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveRetentionAck implements Runnable {
private final RetentionAck ack;
private ReceiveRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveRetentionAck(contactId, ack);
if(LOG.isLoggable(INFO)) LOG.info("DB received retention ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private ReceiveRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveRetentionUpdate(contactId, update);
if(LOG.isLoggable(INFO))
LOG.info("DB received retention update");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private ReceiveSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveSubscriptionAck(contactId, ack);
if(LOG.isLoggable(INFO))
LOG.info("DB received subscription ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;
private ReceiveSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveSubscriptionUpdate(contactId, update);
if(LOG.isLoggable(INFO))
LOG.info("DB received subscription update");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveTransportAck implements Runnable {
private final TransportAck ack;
private ReceiveTransportAck(TransportAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveTransportAck(contactId, ack);
if(LOG.isLoggable(INFO)) LOG.info("DB received transport ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveTransportUpdate implements Runnable {
private final TransportUpdate update;
private ReceiveTransportUpdate(TransportUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveTransportUpdate(contactId, update);
if(LOG.isLoggable(INFO))
LOG.info("DB received transport update");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class GenerateAck implements Runnable {
public void run() {
assert writer != null;
int maxMessages = writer.getMaxMessagesForRequest(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if(a != null) writerTasks.add(new WriteAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteAck implements Runnable {
private final Ack ack;
private WriteAck(Ack ack) {
this.ack = ack;
}
public void run() {
assert writer != null;
try {
writer.writeAck(ack);
if(LOG.isLoggable(INFO)) LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateBatch implements Runnable {
public void run() {
assert writer != null;
try {
Collection<byte[]> b = db.generateRequestedBatch(contactId,
MAX_PACKET_LENGTH, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if(b != null) writerTasks.add(new WriteBatch(b));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteBatch implements Runnable {
private final Collection<byte[]> batch;
private WriteBatch(Collection<byte[]> batch) {
this.batch = batch;
}
public void run() {
assert writer != null;
try {
for(byte[] raw : batch) writer.writeMessage(raw);
if(LOG.isLoggable(INFO)) LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateOffer implements Runnable {
public void run() {
assert writer != null;
int maxMessages = writer.getMaxMessagesForOffer(Long.MAX_VALUE);
try {
Offer o = db.generateOffer(contactId, maxMessages, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated offer: " + (o != null));
if(o != null) writerTasks.add(new WriteOffer(o));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteOffer implements Runnable {
private final Offer offer;
private WriteOffer(Offer offer) {
this.offer = offer;
}
public void run() {
assert writer != null;
try {
writer.writeOffer(offer);
if(LOG.isLoggable(INFO)) LOG.info("Sent offer");
dbExecutor.execute(new GenerateOffer());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateRequest implements Runnable {
public void run() {
assert writer != null;
int maxMessages = writer.getMaxMessagesForRequest(Long.MAX_VALUE);
try {
Request r = db.generateRequest(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated request: " + (r != null));
if(r != null) writerTasks.add(new WriteRequest(r));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteRequest implements Runnable {
private final Request request;
private WriteRequest(Request request) {
this.request = request;
}
public void run() {
assert writer != null;
try {
writer.writeRequest(request);
if(LOG.isLoggable(INFO)) LOG.info("Sent request");
dbExecutor.execute(new GenerateRequest());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
try {
RetentionAck a = db.generateRetentionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention ack: " + (a != null));
if(a != null) writerTasks.add(new WriteRetentionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This tasks runs on the writer thread
private class WriteRetentionAck implements Runnable {
private final RetentionAck ack;
private WriteRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
assert writer != null;
try {
writer.writeRetentionAck(ack);
if(LOG.isLoggable(INFO)) LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention update: " + (u != null));
if(u != null) writerTasks.add(new WriteRetentionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private WriteRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
assert writer != null;
try {
writer.writeRetentionUpdate(update);
if(LOG.isLoggable(INFO)) LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription ack: " + (a != null));
if(a != null) writerTasks.add(new WriteSubscriptionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This tasks runs on the writer thread
private class WriteSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private WriteSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
assert writer != null;
try {
writer.writeSubscriptionAck(ack);
if(LOG.isLoggable(INFO)) LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription update: " + (u != null));
if(u != null) writerTasks.add(new WriteSubscriptionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;
private WriteSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() {
assert writer != null;
try {
writer.writeSubscriptionUpdate(update);
if(LOG.isLoggable(INFO)) LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport acks: " + (acks != null));
if(acks != null) writerTasks.add(new WriteTransportAcks(acks));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This tasks runs on the writer thread
private class WriteTransportAcks implements Runnable {
private final Collection<TransportAck> acks;
private WriteTransportAcks(Collection<TransportAck> acks) {
this.acks = acks;
}
public void run() {
assert writer != null;
try {
for(TransportAck a : acks) writer.writeTransportAck(a);
if(LOG.isLoggable(INFO)) LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport updates: " + (t != null));
if(t != null) writerTasks.add(new WriteTransportUpdates(t));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteTransportUpdates implements Runnable {
private final Collection<TransportUpdate> updates;
private WriteTransportUpdates(Collection<TransportUpdate> updates) {
this.updates = updates;
}
public void run() {
assert writer != null;
try {
for(TransportUpdate u : updates) writer.writeTransportUpdate(u);
if(LOG.isLoggable(INFO)) LOG.info("Sent transport updates");
dbExecutor.execute(new GenerateTransportUpdates());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
}

View File

@@ -0,0 +1,107 @@
package org.briarproject.messaging.duplex;
import static java.util.logging.Level.WARNING;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.crypto.CryptoExecutor;
import org.briarproject.api.crypto.KeyManager;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.duplex.DuplexConnectionFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionReaderFactory;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.ConnectionWriterFactory;
class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
private static final Logger LOG =
Logger.getLogger(DuplexConnectionFactoryImpl.class.getName());
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final KeyManager keyManager;
private final ConnectionRegistry connRegistry;
private final ConnectionReaderFactory connReaderFactory;
private final ConnectionWriterFactory connWriterFactory;
private final PacketReaderFactory packetReaderFactory;
private final PacketWriterFactory packetWriterFactory;
@Inject
DuplexConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
KeyManager keyManager, ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory) {
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.keyManager = keyManager;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory;
}
public void createIncomingConnection(ConnectionContext ctx,
DuplexTransportConnection transport) {
final DuplexConnection conn = new IncomingDuplexConnection(dbExecutor,
cryptoExecutor, messageVerifier, db, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
Runnable write = new Runnable() {
public void run() {
conn.write();
}
};
new Thread(write, "DuplexConnectionWriter").start();
Runnable read = new Runnable() {
public void run() {
conn.read();
}
};
new Thread(read, "DuplexConnectionReader").start();
}
public void createOutgoingConnection(ContactId c, TransportId t,
DuplexTransportConnection transport) {
ConnectionContext ctx = keyManager.getConnectionContext(c, t);
if(ctx == null) {
if(LOG.isLoggable(WARNING))
LOG.warning("Could not create outgoing connection context");
return;
}
final DuplexConnection conn = new OutgoingDuplexConnection(dbExecutor,
cryptoExecutor, messageVerifier, db, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
Runnable write = new Runnable() {
public void run() {
conn.write();
}
};
new Thread(write, "DuplexConnectionWriter").start();
Runnable read = new Runnable() {
public void run() {
conn.read();
}
};
new Thread(read, "DuplexConnectionReader").start();
}
}

View File

@@ -0,0 +1,15 @@
package org.briarproject.messaging.duplex;
import javax.inject.Singleton;
import org.briarproject.api.messaging.duplex.DuplexConnectionFactory;
import com.google.inject.AbstractModule;
public class DuplexMessagingModule extends AbstractModule {
protected void configure() {
bind(DuplexConnectionFactory.class).to(
DuplexConnectionFactoryImpl.class).in(Singleton.class);
}
}

View File

@@ -0,0 +1,50 @@
package org.briarproject.messaging.duplex;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionReader;
import org.briarproject.api.transport.ConnectionReaderFactory;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.ConnectionWriter;
import org.briarproject.api.transport.ConnectionWriterFactory;
class IncomingDuplexConnection extends DuplexConnection {
IncomingDuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory,
ConnectionContext ctx, DuplexTransportConnection transport) {
super(dbExecutor, cryptoExecutor, messageVerifier, db, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
}
@Override
protected ConnectionReader createConnectionReader() throws IOException {
InputStream in = transport.getInputStream();
int maxFrameLength = transport.getMaxFrameLength();
return connReaderFactory.createConnectionReader(in, maxFrameLength,
ctx, true, true);
}
@Override
protected ConnectionWriter createConnectionWriter() throws IOException {
OutputStream out = transport.getOutputStream();
int maxFrameLength = transport.getMaxFrameLength();
return connWriterFactory.createConnectionWriter(out, maxFrameLength,
Long.MAX_VALUE, ctx, true, false);
}
}

View File

@@ -0,0 +1,50 @@
package org.briarproject.messaging.duplex;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionReader;
import org.briarproject.api.transport.ConnectionReaderFactory;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.ConnectionWriter;
import org.briarproject.api.transport.ConnectionWriterFactory;
class OutgoingDuplexConnection extends DuplexConnection {
OutgoingDuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory, ConnectionContext ctx,
DuplexTransportConnection transport) {
super(dbExecutor, cryptoExecutor, messageVerifier, db, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
}
@Override
protected ConnectionReader createConnectionReader() throws IOException {
InputStream in = transport.getInputStream();
int maxFrameLength = transport.getMaxFrameLength();
return connReaderFactory.createConnectionReader(in, maxFrameLength,
ctx, false, false);
}
@Override
protected ConnectionWriter createConnectionWriter() throws IOException {
OutputStream out = transport.getOutputStream();
int maxFrameLength = transport.getMaxFrameLength();
return connWriterFactory.createConnectionWriter(out, maxFrameLength,
Long.MAX_VALUE, ctx, false, true);
}
}

View File

@@ -0,0 +1,280 @@
package org.briarproject.messaging.simplex;
import static java.util.logging.Level.WARNING;
import java.io.IOException;
import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReader;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.plugins.simplex.SimplexTransportReader;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionReader;
import org.briarproject.api.transport.ConnectionReaderFactory;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.util.ByteUtils;
class IncomingSimplexConnection {
private static final Logger LOG =
Logger.getLogger(IncomingSimplexConnection.class.getName());
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final ConnectionRegistry connRegistry;
private final ConnectionReaderFactory connReaderFactory;
private final PacketReaderFactory packetReaderFactory;
private final ConnectionContext ctx;
private final SimplexTransportReader transport;
private final ContactId contactId;
private final TransportId transportId;
IncomingSimplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
PacketReaderFactory packetReaderFactory, ConnectionContext ctx,
SimplexTransportReader transport) {
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
this.packetReaderFactory = packetReaderFactory;
this.ctx = ctx;
this.transport = transport;
contactId = ctx.getContactId();
transportId = ctx.getTransportId();
}
void read() {
connRegistry.registerConnection(contactId, transportId);
try {
InputStream in = transport.getInputStream();
int maxFrameLength = transport.getMaxFrameLength();
ConnectionReader conn = connReaderFactory.createConnectionReader(in,
maxFrameLength, ctx, true, true);
in = conn.getInputStream();
PacketReader reader = packetReaderFactory.createPacketReader(in);
// Read packets until EOF
while(!reader.eof()) {
if(reader.hasAck()) {
Ack a = reader.readAck();
dbExecutor.execute(new ReceiveAck(a));
} else if(reader.hasMessage()) {
UnverifiedMessage m = reader.readMessage();
cryptoExecutor.execute(new VerifyMessage(m));
} else if(reader.hasRetentionAck()) {
RetentionAck a = reader.readRetentionAck();
dbExecutor.execute(new ReceiveRetentionAck(a));
} else if(reader.hasRetentionUpdate()) {
RetentionUpdate u = reader.readRetentionUpdate();
dbExecutor.execute(new ReceiveRetentionUpdate(u));
} else if(reader.hasSubscriptionAck()) {
SubscriptionAck a = reader.readSubscriptionAck();
dbExecutor.execute(new ReceiveSubscriptionAck(a));
} else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = reader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if(reader.hasTransportAck()) {
TransportAck a = reader.readTransportAck();
dbExecutor.execute(new ReceiveTransportAck(a));
} else if(reader.hasTransportUpdate()) {
TransportUpdate u = reader.readTransportUpdate();
dbExecutor.execute(new ReceiveTransportUpdate(u));
} else {
throw new FormatException();
}
}
dispose(false, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
} finally {
connRegistry.unregisterConnection(contactId, transportId);
}
}
private void dispose(boolean exception, boolean recognised) {
ByteUtils.erase(ctx.getSecret());
try {
transport.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private class ReceiveAck implements Runnable {
private final Ack ack;
private ReceiveAck(Ack ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveAck(contactId, ack);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class VerifyMessage implements Runnable {
private final UnverifiedMessage message;
private VerifyMessage(UnverifiedMessage message) {
this.message = message;
}
public void run() {
try {
Message m = messageVerifier.verifyMessage(message);
dbExecutor.execute(new ReceiveMessage(m));
} catch(GeneralSecurityException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveMessage implements Runnable {
private final Message message;
private ReceiveMessage(Message message) {
this.message = message;
}
public void run() {
try {
db.receiveMessage(contactId, message);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveRetentionAck implements Runnable {
private final RetentionAck ack;
private ReceiveRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveRetentionAck(contactId, ack);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private ReceiveRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveRetentionUpdate(contactId, update);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private ReceiveSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveSubscriptionAck(contactId, ack);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;
private ReceiveSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveSubscriptionUpdate(contactId, update);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveTransportAck implements Runnable {
private final TransportAck ack;
private ReceiveTransportAck(TransportAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveTransportAck(contactId, ack);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveTransportUpdate implements Runnable {
private final TransportUpdate update;
private ReceiveTransportUpdate(TransportUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveTransportUpdate(contactId, update);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
}

View File

@@ -0,0 +1,187 @@
package org.briarproject.messaging.simplex;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.ConnectionWriter;
import org.briarproject.api.transport.ConnectionWriterFactory;
import org.briarproject.util.ByteUtils;
class OutgoingSimplexConnection {
private static final Logger LOG =
Logger.getLogger(OutgoingSimplexConnection.class.getName());
private final DatabaseComponent db;
private final ConnectionRegistry connRegistry;
private final ConnectionWriterFactory connWriterFactory;
private final PacketWriterFactory packetWriterFactory;
private final ConnectionContext ctx;
private final SimplexTransportWriter transport;
private final ContactId contactId;
private final TransportId transportId;
private final long maxLatency;
OutgoingSimplexConnection(DatabaseComponent db,
ConnectionRegistry connRegistry,
ConnectionWriterFactory connWriterFactory,
PacketWriterFactory packetWriterFactory, ConnectionContext ctx,
SimplexTransportWriter transport) {
this.db = db;
this.connRegistry = connRegistry;
this.connWriterFactory = connWriterFactory;
this.packetWriterFactory = packetWriterFactory;
this.ctx = ctx;
this.transport = transport;
contactId = ctx.getContactId();
transportId = ctx.getTransportId();
maxLatency = transport.getMaxLatency();
}
void write() {
connRegistry.registerConnection(contactId, transportId);
try {
OutputStream out = transport.getOutputStream();
long capacity = transport.getCapacity();
int maxFrameLength = transport.getMaxFrameLength();
ConnectionWriter conn = connWriterFactory.createConnectionWriter(
out, maxFrameLength, capacity, ctx, false, true);
out = conn.getOutputStream();
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH)
throw new EOFException();
PacketWriter writer = packetWriterFactory.createPacketWriter(out,
transport.shouldFlush());
// Send the initial packets: updates and acks
boolean hasSpace = writeTransportAcks(conn, writer);
if(hasSpace) hasSpace = writeTransportUpdates(conn, writer);
if(hasSpace) hasSpace = writeSubscriptionAck(conn, writer);
if(hasSpace) hasSpace = writeSubscriptionUpdate(conn, writer);
if(hasSpace) hasSpace = writeRetentionAck(conn, writer);
if(hasSpace) hasSpace = writeRetentionUpdate(conn, writer);
// Write acks until you can't write acks no more
capacity = conn.getRemainingCapacity();
int maxMessages = writer.getMaxMessagesForRequest(capacity);
Ack a = db.generateAck(contactId, maxMessages);
while(a != null) {
writer.writeAck(a);
capacity = conn.getRemainingCapacity();
maxMessages = writer.getMaxMessagesForRequest(capacity);
a = db.generateAck(contactId, maxMessages);
}
// Write messages until you can't write messages no more
capacity = conn.getRemainingCapacity();
int maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH);
Collection<byte[]> batch = db.generateBatch(contactId, maxLength,
maxLatency);
while(batch != null) {
for(byte[] raw : batch) writer.writeMessage(raw);
capacity = conn.getRemainingCapacity();
maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH);
batch = db.generateBatch(contactId, maxLength, maxLatency);
}
writer.flush();
writer.close();
dispose(false);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true);
}
connRegistry.unregisterConnection(contactId, transportId);
}
private boolean writeTransportAcks(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
Collection<TransportAck> acks = db.generateTransportAcks(contactId);
if(acks == null) return true;
for(TransportAck a : acks) {
writer.writeTransportAck(a);
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH) return false;
}
return true;
}
private boolean writeTransportUpdates(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
Collection<TransportUpdate> updates =
db.generateTransportUpdates(contactId, maxLatency);
if(updates == null) return true;
for(TransportUpdate u : updates) {
writer.writeTransportUpdate(u);
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH) return false;
}
return true;
}
private boolean writeSubscriptionAck(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(a == null) return true;
writer.writeSubscriptionAck(a);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private boolean writeSubscriptionUpdate(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if(u == null) return true;
writer.writeSubscriptionUpdate(u);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private boolean writeRetentionAck(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
RetentionAck a = db.generateRetentionAck(contactId);
if(a == null) return true;
writer.writeRetentionAck(a);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private boolean writeRetentionUpdate(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
RetentionUpdate u = db.generateRetentionUpdate(contactId, maxLatency);
if(u == null) return true;
writer.writeRetentionUpdate(u);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private void dispose(boolean exception) {
ByteUtils.erase(ctx.getSecret());
try {
transport.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}

View File

@@ -0,0 +1,93 @@
package org.briarproject.messaging.simplex;
import static java.util.logging.Level.WARNING;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.crypto.CryptoExecutor;
import org.briarproject.api.crypto.KeyManager;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.simplex.SimplexConnectionFactory;
import org.briarproject.api.plugins.simplex.SimplexTransportReader;
import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionReaderFactory;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.ConnectionWriterFactory;
class SimplexConnectionFactoryImpl implements SimplexConnectionFactory {
private static final Logger LOG =
Logger.getLogger(SimplexConnectionFactoryImpl.class.getName());
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final KeyManager keyManager;
private final ConnectionRegistry connRegistry;
private final ConnectionReaderFactory connReaderFactory;
private final ConnectionWriterFactory connWriterFactory;
private final PacketReaderFactory packetReaderFactory;
private final PacketWriterFactory packetWriterFactory;
@Inject
SimplexConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
KeyManager keyManager, ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory) {
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.keyManager = keyManager;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory;
}
public void createIncomingConnection(ConnectionContext ctx,
SimplexTransportReader r) {
final IncomingSimplexConnection conn = new IncomingSimplexConnection(
dbExecutor, cryptoExecutor, messageVerifier, db, connRegistry,
connReaderFactory, packetReaderFactory, ctx, r);
Runnable read = new Runnable() {
public void run() {
conn.read();
}
};
new Thread(read, "SimplexConnectionReader").start();
}
public void createOutgoingConnection(ContactId c, TransportId t,
SimplexTransportWriter w) {
ConnectionContext ctx = keyManager.getConnectionContext(c, t);
if(ctx == null) {
if(LOG.isLoggable(WARNING))
LOG.warning("Could not create outgoing connection context");
return;
}
final OutgoingSimplexConnection conn = new OutgoingSimplexConnection(db,
connRegistry, connWriterFactory, packetWriterFactory, ctx, w);
Runnable write = new Runnable() {
public void run() {
conn.write();
}
};
new Thread(write, "SimplexConnectionWriter").start();
}
}

View File

@@ -0,0 +1,15 @@
package org.briarproject.messaging.simplex;
import javax.inject.Singleton;
import org.briarproject.api.messaging.simplex.SimplexConnectionFactory;
import com.google.inject.AbstractModule;
public class SimplexMessagingModule extends AbstractModule {
protected void configure() {
bind(SimplexConnectionFactory.class).to(
SimplexConnectionFactoryImpl.class).in(Singleton.class);
}
}