Moved classes from messaging package to sync package.

This commit is contained in:
akwizgran
2015-12-15 14:36:45 +00:00
parent d99df73380
commit e370cafb12
94 changed files with 905 additions and 801 deletions

View File

@@ -0,0 +1,58 @@
package org.briarproject.sync;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorFactory;
import org.briarproject.api.AuthorId;
import org.briarproject.api.LocalAuthor;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.data.Writer;
import org.briarproject.api.data.WriterFactory;
import org.briarproject.api.system.Clock;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.inject.Inject;
class AuthorFactoryImpl implements AuthorFactory {
private final CryptoComponent crypto;
private final WriterFactory writerFactory;
private final Clock clock;
@Inject
AuthorFactoryImpl(CryptoComponent crypto, WriterFactory writerFactory,
Clock clock) {
this.crypto = crypto;
this.writerFactory = writerFactory;
this.clock = clock;
}
public Author createAuthor(String name, byte[] publicKey) {
return new Author(getId(name, publicKey), name, publicKey);
}
public LocalAuthor createLocalAuthor(String name, byte[] publicKey,
byte[] privateKey) {
return new LocalAuthor(getId(name, publicKey), name, publicKey,
privateKey, clock.currentTimeMillis());
}
private AuthorId getId(String name, byte[] publicKey) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Writer w = writerFactory.createWriter(out);
try {
w.writeListStart();
w.writeString(name);
w.writeRaw(publicKey);
w.writeListEnd();
} catch (IOException e) {
// Shouldn't happen with ByteArrayOutputStream
throw new RuntimeException();
}
MessageDigest messageDigest = crypto.getMessageDigest();
messageDigest.update(out.toByteArray());
return new AuthorId(messageDigest.digest());
}
}

View File

@@ -0,0 +1,40 @@
package org.briarproject.sync;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorId;
import org.briarproject.api.FormatException;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.data.Reader;
import java.io.IOException;
import static org.briarproject.api.AuthorConstants.MAX_AUTHOR_NAME_LENGTH;
import static org.briarproject.api.AuthorConstants.MAX_PUBLIC_KEY_LENGTH;
class AuthorReader implements ObjectReader<Author> {
private final MessageDigest messageDigest;
AuthorReader(CryptoComponent crypto) {
messageDigest = crypto.getMessageDigest();
}
public Author readObject(Reader r) throws IOException {
// Set up the reader
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
r.addConsumer(digesting);
// Read and digest the data
r.readListStart();
String name = r.readString(MAX_AUTHOR_NAME_LENGTH);
if (name.length() == 0) throw new FormatException();
byte[] publicKey = r.readRaw(MAX_PUBLIC_KEY_LENGTH);
r.readListEnd();
// Reset the reader
r.removeConsumer(digesting);
// Build and return the author
AuthorId id = new AuthorId(messageDigest.digest());
return new Author(id, name, publicKey);
}
}

View File

@@ -0,0 +1,24 @@
package org.briarproject.sync;
import org.briarproject.api.data.Consumer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/** A consumer that makes a copy of the bytes consumed. */
class CopyingConsumer implements Consumer {
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
public byte[] getCopy() {
return out.toByteArray();
}
public void write(byte b) throws IOException {
out.write(b);
}
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
}

View File

@@ -0,0 +1,34 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.data.Consumer;
import java.io.IOException;
/**
* A consumer that counts the number of bytes consumed and throws a
* FormatException if the count exceeds a given limit.
*/
class CountingConsumer implements Consumer {
private final long limit;
private long count = 0;
public CountingConsumer(long limit) {
this.limit = limit;
}
public long getCount() {
return count;
}
public void write(byte b) throws IOException {
count++;
if (count > limit) throw new FormatException();
}
public void write(byte[] b, int off, int len) throws IOException {
count += len;
if (count > limit) throw new FormatException();
}
}

View File

@@ -0,0 +1,22 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.data.Consumer;
/** A consumer that passes its input through a message digest. */
class DigestingConsumer implements Consumer {
private final MessageDigest messageDigest;
public DigestingConsumer(MessageDigest messageDigest) {
this.messageDigest = messageDigest;
}
public void write(byte b) {
messageDigest.update(b);
}
public void write(byte[] b, int off, int len) {
messageDigest.update(b, off, len);
}
}

View File

@@ -0,0 +1,576 @@
package org.briarproject.sync;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageExpiredEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.RemoteRetentionTimeUpdatedEvent;
import org.briarproject.api.event.RemoteSubscriptionsUpdatedEvent;
import org.briarproject.api.event.RemoteTransportsUpdatedEvent;
import org.briarproject.api.event.ShutdownEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.system.Clock;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
/**
* An outgoing {@link MessagingSession
* MessagingSession} suitable for duplex transports. The session offers
* messages before sending them, keeps its output stream open when there are no
* packets to send, and reacts to events that make packets available to send.
*/
class DuplexOutgoingSession implements MessagingSession, EventListener {
// Check for retransmittable packets once every 60 seconds
private static final int RETX_QUERY_INTERVAL = 60 * 1000;
private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName());
private static final ThrowingRunnable<IOException> CLOSE =
new ThrowingRunnable<IOException>() {
public void run() {}
};
private final DatabaseComponent db;
private final Executor dbExecutor;
private final EventBus eventBus;
private final Clock clock;
private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency, maxIdleTime;
private final PacketWriter packetWriter;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
// The following must only be accessed on the writer thread
private long nextKeepalive = 0, nextRetxQuery = 0;
private boolean dataToFlush = true;
private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, Clock clock, ContactId contactId,
TransportId transportId, int maxLatency, int maxIdleTime,
PacketWriter packetWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.clock = clock;
this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime;
this.packetWriter = packetWriter;
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
}
public void run() throws IOException {
eventBus.addListener(this);
try {
// Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionAck());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
dbExecutor.execute(new GenerateRequest());
long now = clock.currentTimeMillis();
nextKeepalive = now + maxIdleTime;
nextRetxQuery = now + RETX_QUERY_INTERVAL;
// Write packets until interrupted
try {
while (!interrupted) {
// Work out how long we should wait for a packet
now = clock.currentTimeMillis();
long wait = Math.min(nextKeepalive, nextRetxQuery) - now;
if (wait < 0) wait = 0;
// Flush any unflushed data if we're going to wait
if (wait > 0 && dataToFlush && writerTasks.isEmpty()) {
packetWriter.flush();
dataToFlush = false;
nextKeepalive = now + maxIdleTime;
}
// Wait for a packet
ThrowingRunnable<IOException> task = writerTasks.poll(wait,
MILLISECONDS);
if (task == null) {
now = clock.currentTimeMillis();
if (now >= nextRetxQuery) {
// Check for retransmittable packets
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
nextRetxQuery = now + RETX_QUERY_INTERVAL;
}
if (now >= nextKeepalive) {
// Flush the stream to keep it alive
packetWriter.flush();
dataToFlush = false;
nextKeepalive = now + maxIdleTime;
}
} else if (task == CLOSE) {
break;
} else {
task.run();
dataToFlush = true;
}
}
if (dataToFlush) packetWriter.flush();
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write");
Thread.currentThread().interrupt();
}
} finally {
eventBus.removeListener(this);
}
}
public void interrupt() {
interrupted = true;
writerTasks.add(CLOSE);
}
public void eventOccurred(Event e) {
if (e instanceof ContactRemovedEvent) {
ContactRemovedEvent c = (ContactRemovedEvent) e;
if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof MessageAddedEvent) {
dbExecutor.execute(new GenerateOffer());
} else if (e instanceof MessageExpiredEvent) {
dbExecutor.execute(new GenerateRetentionUpdate());
} else if (e instanceof LocalSubscriptionsUpdatedEvent) {
LocalSubscriptionsUpdatedEvent l =
(LocalSubscriptionsUpdatedEvent) e;
if (l.getAffectedContacts().contains(contactId)) {
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateOffer());
}
} else if (e instanceof LocalTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportUpdates());
} else if (e instanceof MessageRequestedEvent) {
if (((MessageRequestedEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateBatch());
} else if (e instanceof MessageToAckEvent) {
if (((MessageToAckEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateAck());
} else if (e instanceof MessageToRequestEvent) {
if (((MessageToRequestEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateRequest());
} else if (e instanceof RemoteRetentionTimeUpdatedEvent) {
RemoteRetentionTimeUpdatedEvent r =
(RemoteRetentionTimeUpdatedEvent) e;
if (r.getContactId().equals(contactId))
dbExecutor.execute(new GenerateRetentionAck());
} else if (e instanceof RemoteSubscriptionsUpdatedEvent) {
RemoteSubscriptionsUpdatedEvent r =
(RemoteSubscriptionsUpdatedEvent) e;
if (r.getContactId().equals(contactId)) {
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateOffer());
}
} else if (e instanceof RemoteTransportsUpdatedEvent) {
RemoteTransportsUpdatedEvent r =
(RemoteTransportsUpdatedEvent) e;
if (r.getContactId().equals(contactId))
dbExecutor.execute(new GenerateTransportAcks());
} else if (e instanceof ShutdownEvent) {
interrupt();
} else if (e instanceof TransportRemovedEvent) {
TransportRemovedEvent t = (TransportRemovedEvent) e;
if (t.getTransportId().equals(transportId)) interrupt();
}
}
// This task runs on the database thread
private class GenerateAck implements Runnable {
public void run() {
if (interrupted) return;
int maxMessages = packetWriter.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if (a != null) writerTasks.add(new WriteAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteAck implements ThrowingRunnable<IOException> {
private final Ack ack;
private WriteAck(Ack ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeAck(ack);
LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck());
}
}
// This task runs on the database thread
private class GenerateBatch implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<byte[]> b = db.generateRequestedBatch(contactId,
MAX_PAYLOAD_LENGTH, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b != null) writerTasks.add(new WriteBatch(b));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteBatch implements ThrowingRunnable<IOException> {
private final Collection<byte[]> batch;
private WriteBatch(Collection<byte[]> batch) {
this.batch = batch;
}
public void run() throws IOException {
if (interrupted) return;
for (byte[] raw : batch) packetWriter.writeMessage(raw);
LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch());
}
}
// This task runs on the database thread
private class GenerateOffer implements Runnable {
public void run() {
if (interrupted) return;
int maxMessages = packetWriter.getMaxMessagesForOffer(
Long.MAX_VALUE);
try {
Offer o = db.generateOffer(contactId, maxMessages, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated offer: " + (o != null));
if (o != null) writerTasks.add(new WriteOffer(o));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteOffer implements ThrowingRunnable<IOException> {
private final Offer offer;
private WriteOffer(Offer offer) {
this.offer = offer;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeOffer(offer);
LOG.info("Sent offer");
dbExecutor.execute(new GenerateOffer());
}
}
// This task runs on the database thread
private class GenerateRequest implements Runnable {
public void run() {
if (interrupted) return;
int maxMessages = packetWriter.getMaxMessagesForRequest(
Long.MAX_VALUE);
try {
Request r = db.generateRequest(contactId, maxMessages);
if (LOG.isLoggable(INFO))
LOG.info("Generated request: " + (r != null));
if (r != null) writerTasks.add(new WriteRequest(r));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteRequest implements ThrowingRunnable<IOException> {
private final Request request;
private WriteRequest(Request request) {
this.request = request;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeRequest(request);
LOG.info("Sent request");
dbExecutor.execute(new GenerateRequest());
}
}
// This task runs on the database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
if (interrupted) return;
try {
RetentionAck a = db.generateRetentionAck(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated retention ack: " + (a != null));
if (a != null) writerTasks.add(new WriteRetentionAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteRetentionAck implements ThrowingRunnable<IOException> {
private final RetentionAck ack;
private WriteRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeRetentionAck(ack);
LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck());
}
}
// This task runs on the database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
if (interrupted) return;
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated retention update: " + (u != null));
if (u != null) writerTasks.add(new WriteRetentionUpdate(u));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteRetentionUpdate
implements ThrowingRunnable<IOException> {
private final RetentionUpdate update;
private WriteRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeRetentionUpdate(update);
LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate());
}
}
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
if (interrupted) return;
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated subscription ack: " + (a != null));
if (a != null) writerTasks.add(new WriteSubscriptionAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteSubscriptionAck
implements ThrowingRunnable<IOException> {
private final SubscriptionAck ack;
private WriteSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeSubscriptionAck(ack);
LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck());
}
}
// This task runs on the database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
if (interrupted) return;
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated subscription update: " + (u != null));
if (u != null) writerTasks.add(new WriteSubscriptionUpdate(u));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteSubscriptionUpdate
implements ThrowingRunnable<IOException> {
private final SubscriptionUpdate update;
private WriteSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeSubscriptionUpdate(update);
LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate());
}
}
// This task runs on the database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated transport acks: " + (acks != null));
if (acks != null) writerTasks.add(new WriteTransportAcks(acks));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteTransportAcks implements ThrowingRunnable<IOException> {
private final Collection<TransportAck> acks;
private WriteTransportAcks(Collection<TransportAck> acks) {
this.acks = acks;
}
public void run() throws IOException {
if (interrupted) return;
for (TransportAck a : acks) packetWriter.writeTransportAck(a);
LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks());
}
}
// This task runs on the database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated transport updates: " + (t != null));
if (t != null) writerTasks.add(new WriteTransportUpdates(t));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteTransportUpdates
implements ThrowingRunnable<IOException> {
private final Collection<TransportUpdate> updates;
private WriteTransportUpdates(Collection<TransportUpdate> updates) {
this.updates = updates;
}
public void run() throws IOException {
if (interrupted) return;
for (TransportUpdate u : updates)
packetWriter.writeTransportUpdate(u);
LOG.info("Sent transport updates");
dbExecutor.execute(new GenerateTransportUpdates());
}
}
}

View File

@@ -0,0 +1,52 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.data.Writer;
import org.briarproject.api.data.WriterFactory;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupFactory;
import org.briarproject.api.sync.GroupId;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.inject.Inject;
import static org.briarproject.api.sync.MessagingConstants.GROUP_SALT_LENGTH;
class GroupFactoryImpl implements GroupFactory {
private final CryptoComponent crypto;
private final WriterFactory writerFactory;
@Inject
GroupFactoryImpl(CryptoComponent crypto, WriterFactory writerFactory) {
this.crypto = crypto;
this.writerFactory = writerFactory;
}
public Group createGroup(String name) {
byte[] salt = new byte[GROUP_SALT_LENGTH];
crypto.getSecureRandom().nextBytes(salt);
return createGroup(name, salt);
}
public Group createGroup(String name, byte[] salt) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Writer w = writerFactory.createWriter(out);
try {
w.writeListStart();
w.writeString(name);
w.writeRaw(salt);
w.writeListEnd();
} catch (IOException e) {
// Shouldn't happen with ByteArrayOutputStream
throw new RuntimeException();
}
MessageDigest messageDigest = crypto.getMessageDigest();
messageDigest.update(out.toByteArray());
GroupId id = new GroupId(messageDigest.digest());
return new Group(id, name, salt);
}
}

View File

@@ -0,0 +1,39 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.data.Reader;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupId;
import java.io.IOException;
import static org.briarproject.api.sync.MessagingConstants.GROUP_SALT_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_GROUP_NAME_LENGTH;
class GroupReader implements ObjectReader<Group> {
private final MessageDigest messageDigest;
GroupReader(CryptoComponent crypto) {
messageDigest = crypto.getMessageDigest();
}
public Group readObject(Reader r) throws IOException {
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
// Read and digest the data
r.addConsumer(digesting);
r.readListStart();
String name = r.readString(MAX_GROUP_NAME_LENGTH);
if (name.length() == 0) throw new FormatException();
byte[] salt = r.readRaw(GROUP_SALT_LENGTH);
if (salt.length != GROUP_SALT_LENGTH) throw new FormatException();
r.readListEnd();
r.removeConsumer(digesting);
// Build and return the group
GroupId id = new GroupId(messageDigest.digest());
return new Group(id, name, salt);
}
}

View File

@@ -0,0 +1,328 @@
package org.briarproject.sync;
import org.briarproject.api.ContactId;
import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.ShutdownEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageVerifier;
import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.sync.UnverifiedMessage;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
/**
* An incoming {@link MessagingSession
* MessagingSession}.
*/
class IncomingSession implements MessagingSession, EventListener {
private static final Logger LOG =
Logger.getLogger(IncomingSession.class.getName());
private final DatabaseComponent db;
private final Executor dbExecutor, cryptoExecutor;
private final EventBus eventBus;
private final MessageVerifier messageVerifier;
private final ContactId contactId;
private final TransportId transportId;
private final PacketReader packetReader;
private volatile boolean interrupted = false;
IncomingSession(DatabaseComponent db, Executor dbExecutor,
Executor cryptoExecutor, EventBus eventBus,
MessageVerifier messageVerifier, ContactId contactId,
TransportId transportId, PacketReader packetReader) {
this.db = db;
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.eventBus = eventBus;
this.messageVerifier = messageVerifier;
this.contactId = contactId;
this.transportId = transportId;
this.packetReader = packetReader;
}
public void run() throws IOException {
eventBus.addListener(this);
try {
// Read packets until interrupted or EOF
while (!interrupted && !packetReader.eof()) {
if (packetReader.hasAck()) {
Ack a = packetReader.readAck();
dbExecutor.execute(new ReceiveAck(a));
} else if (packetReader.hasMessage()) {
UnverifiedMessage m = packetReader.readMessage();
cryptoExecutor.execute(new VerifyMessage(m));
} else if (packetReader.hasOffer()) {
Offer o = packetReader.readOffer();
dbExecutor.execute(new ReceiveOffer(o));
} else if (packetReader.hasRequest()) {
Request r = packetReader.readRequest();
dbExecutor.execute(new ReceiveRequest(r));
} else if (packetReader.hasRetentionAck()) {
RetentionAck a = packetReader.readRetentionAck();
dbExecutor.execute(new ReceiveRetentionAck(a));
} else if (packetReader.hasRetentionUpdate()) {
RetentionUpdate u = packetReader.readRetentionUpdate();
dbExecutor.execute(new ReceiveRetentionUpdate(u));
} else if (packetReader.hasSubscriptionAck()) {
SubscriptionAck a = packetReader.readSubscriptionAck();
dbExecutor.execute(new ReceiveSubscriptionAck(a));
} else if (packetReader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = packetReader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if (packetReader.hasTransportAck()) {
TransportAck a = packetReader.readTransportAck();
dbExecutor.execute(new ReceiveTransportAck(a));
} else if (packetReader.hasTransportUpdate()) {
TransportUpdate u = packetReader.readTransportUpdate();
dbExecutor.execute(new ReceiveTransportUpdate(u));
} else {
throw new FormatException();
}
}
} finally {
eventBus.removeListener(this);
}
}
public void interrupt() {
// FIXME: This won't interrupt a blocking read
interrupted = true;
}
public void eventOccurred(Event e) {
if (e instanceof ContactRemovedEvent) {
ContactRemovedEvent c = (ContactRemovedEvent) e;
if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof ShutdownEvent) {
interrupt();
} else if (e instanceof TransportRemovedEvent) {
TransportRemovedEvent t = (TransportRemovedEvent) e;
if (t.getTransportId().equals(transportId)) interrupt();
}
}
private class ReceiveAck implements Runnable {
private final Ack ack;
private ReceiveAck(Ack ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveAck(contactId, ack);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class VerifyMessage implements Runnable {
private final UnverifiedMessage message;
private VerifyMessage(UnverifiedMessage message) {
this.message = message;
}
public void run() {
try {
Message m = messageVerifier.verifyMessage(message);
dbExecutor.execute(new ReceiveMessage(m));
} catch (GeneralSecurityException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveMessage implements Runnable {
private final Message message;
private ReceiveMessage(Message message) {
this.message = message;
}
public void run() {
try {
db.receiveMessage(contactId, message);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveOffer implements Runnable {
private final Offer offer;
private ReceiveOffer(Offer offer) {
this.offer = offer;
}
public void run() {
try {
db.receiveOffer(contactId, offer);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveRequest implements Runnable {
private final Request request;
private ReceiveRequest(Request request) {
this.request = request;
}
public void run() {
try {
db.receiveRequest(contactId, request);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveRetentionAck implements Runnable {
private final RetentionAck ack;
private ReceiveRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveRetentionAck(contactId, ack);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private ReceiveRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveRetentionUpdate(contactId, update);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private ReceiveSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveSubscriptionAck(contactId, ack);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;
private ReceiveSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveSubscriptionUpdate(contactId, update);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveTransportAck implements Runnable {
private final TransportAck ack;
private ReceiveTransportAck(TransportAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveTransportAck(contactId, ack);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveTransportUpdate implements Runnable {
private final TransportUpdate update;
private ReceiveTransportUpdate(TransportUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveTransportUpdate(contactId, update);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
}

View File

@@ -0,0 +1,129 @@
package org.briarproject.sync;
import org.briarproject.api.Author;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.crypto.PrivateKey;
import org.briarproject.api.crypto.Signature;
import org.briarproject.api.data.Consumer;
import org.briarproject.api.data.Writer;
import org.briarproject.api.data.WriterFactory;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageFactory;
import org.briarproject.api.sync.MessageId;
import org.briarproject.util.StringUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
import javax.inject.Inject;
import static org.briarproject.api.AuthorConstants.MAX_SIGNATURE_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_BODY_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_CONTENT_TYPE_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MESSAGE_SALT_LENGTH;
class MessageFactoryImpl implements MessageFactory {
private final Signature signature;
private final SecureRandom random;
private final MessageDigest messageDigest;
private final WriterFactory writerFactory;
@Inject
MessageFactoryImpl(CryptoComponent crypto, WriterFactory writerFactory) {
signature = crypto.getSignature();
random = crypto.getSecureRandom();
messageDigest = crypto.getMessageDigest();
this.writerFactory = writerFactory;
}
public Message createAnonymousMessage(MessageId parent, Group group,
String contentType, long timestamp, byte[] body) throws IOException,
GeneralSecurityException {
return createMessage(parent, group, null, null, contentType, timestamp,
body);
}
public Message createPseudonymousMessage(MessageId parent, Group group,
Author author, PrivateKey privateKey, String contentType,
long timestamp, byte[] body) throws IOException,
GeneralSecurityException {
return createMessage(parent, group, author, privateKey, contentType,
timestamp, body);
}
private Message createMessage(MessageId parent, Group group, Author author,
PrivateKey privateKey, String contentType, long timestamp,
byte[] body) throws IOException, GeneralSecurityException {
// Validate the arguments
if ((author == null) != (privateKey == null))
throw new IllegalArgumentException();
if (StringUtils.toUtf8(contentType).length > MAX_CONTENT_TYPE_LENGTH)
throw new IllegalArgumentException();
if (body.length > MAX_BODY_LENGTH)
throw new IllegalArgumentException();
// Serialise the message to a buffer
ByteArrayOutputStream out = new ByteArrayOutputStream();
Writer w = writerFactory.createWriter(out);
// Initialise the consumers
CountingConsumer counting = new CountingConsumer(MAX_PAYLOAD_LENGTH);
w.addConsumer(counting);
Consumer digestingConsumer = new DigestingConsumer(messageDigest);
w.addConsumer(digestingConsumer);
Consumer signingConsumer = null;
if (privateKey != null) {
signature.initSign(privateKey);
signingConsumer = new org.briarproject.sync.SigningConsumer(signature);
w.addConsumer(signingConsumer);
}
// Write the message
w.writeListStart();
if (parent == null) w.writeNull();
else w.writeRaw(parent.getBytes());
writeGroup(w, group);
if (author == null) w.writeNull();
else writeAuthor(w, author);
w.writeString(contentType);
w.writeInteger(timestamp);
byte[] salt = new byte[MESSAGE_SALT_LENGTH];
random.nextBytes(salt);
w.writeRaw(salt);
w.writeRaw(body);
int bodyStart = (int) counting.getCount() - body.length;
// Sign the message with the author's private key, if there is one
if (privateKey == null) {
w.writeNull();
} else {
w.removeConsumer(signingConsumer);
byte[] sig = signature.sign();
if (sig.length > MAX_SIGNATURE_LENGTH)
throw new IllegalArgumentException();
w.writeRaw(sig);
}
w.writeListEnd();
// Hash the message, including the signature, to get the message ID
w.removeConsumer(digestingConsumer);
MessageId id = new MessageId(messageDigest.digest());
return new org.briarproject.sync.MessageImpl(id, parent, group, author, contentType,
timestamp, out.toByteArray(), bodyStart, body.length);
}
private void writeGroup(Writer w, Group g) throws IOException {
w.writeListStart();
w.writeString(g.getName());
w.writeRaw(g.getSalt());
w.writeListEnd();
}
private void writeAuthor(Writer w, Author a) throws IOException {
w.writeListStart();
w.writeString(a.getName());
w.writeRaw(a.getPublicKey());
w.writeListEnd();
}
}

View File

@@ -0,0 +1,84 @@
package org.briarproject.sync;
import org.briarproject.api.Author;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import static org.briarproject.api.sync.MessagingConstants.MAX_BODY_LENGTH;
/** A simple in-memory implementation of a message. */
class MessageImpl implements Message {
private final MessageId id, parent;
private final Group group;
private final Author author;
private final String contentType;
private final long timestamp;
private final byte[] raw;
private final int bodyStart, bodyLength;
public MessageImpl(MessageId id, MessageId parent, Group group,
Author author, String contentType, long timestamp,
byte[] raw, int bodyStart, int bodyLength) {
if (bodyStart + bodyLength > raw.length)
throw new IllegalArgumentException();
if (bodyLength > MAX_BODY_LENGTH)
throw new IllegalArgumentException();
this.id = id;
this.parent = parent;
this.group = group;
this.author = author;
this.contentType = contentType;
this.timestamp = timestamp;
this.raw = raw;
this.bodyStart = bodyStart;
this.bodyLength = bodyLength;
}
public MessageId getId() {
return id;
}
public MessageId getParent() {
return parent;
}
public Group getGroup() {
return group;
}
public Author getAuthor() {
return author;
}
public String getContentType() {
return contentType;
}
public long getTimestamp() {
return timestamp;
}
public byte[] getSerialised() {
return raw;
}
public int getBodyStart() {
return bodyStart;
}
public int getBodyLength() {
return bodyLength;
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public boolean equals(Object o) {
return o instanceof Message && id.equals(((Message) o).getId());
}
}

View File

@@ -0,0 +1,82 @@
package org.briarproject.sync;
import org.briarproject.api.Author;
import org.briarproject.api.FormatException;
import org.briarproject.api.UniqueId;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.data.Reader;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.UnverifiedMessage;
import java.io.IOException;
import static org.briarproject.api.AuthorConstants.MAX_SIGNATURE_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_BODY_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_CONTENT_TYPE_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MESSAGE_SALT_LENGTH;
class MessageReader implements ObjectReader<UnverifiedMessage> {
private final ObjectReader<Group> groupReader;
private final ObjectReader<Author> authorReader;
MessageReader(ObjectReader<Group> groupReader,
ObjectReader<Author> authorReader) {
this.groupReader = groupReader;
this.authorReader = authorReader;
}
public UnverifiedMessage readObject(Reader r) throws IOException {
CopyingConsumer copying = new CopyingConsumer();
CountingConsumer counting = new CountingConsumer(MAX_PAYLOAD_LENGTH);
r.addConsumer(copying);
r.addConsumer(counting);
// Read the start of the message
r.readListStart();
// Read the parent's message ID, if there is one
MessageId parent = null;
if (r.hasNull()) {
r.readNull();
} else {
byte[] b = r.readRaw(UniqueId.LENGTH);
if (b.length < UniqueId.LENGTH) throw new FormatException();
parent = new MessageId(b);
}
// Read the group
Group group = groupReader.readObject(r);
// Read the author, if there is one
Author author = null;
if (r.hasNull()) r.readNull();
else author = authorReader.readObject(r);
// Read the content type
String contentType = r.readString(MAX_CONTENT_TYPE_LENGTH);
// Read the timestamp
long timestamp = r.readInteger();
if (timestamp < 0) throw new FormatException();
// Read the salt
byte[] salt = r.readRaw(MESSAGE_SALT_LENGTH);
if (salt.length < MESSAGE_SALT_LENGTH) throw new FormatException();
// Read the message body
byte[] body = r.readRaw(MAX_BODY_LENGTH);
// Record the offset of the body within the message
int bodyStart = (int) counting.getCount() - body.length;
// Record the length of the data covered by the author's signature
int signedLength = (int) counting.getCount();
// Read the author's signature, if there is one
byte[] signature = null;
if (author == null) r.readNull();
else signature = r.readRaw(MAX_SIGNATURE_LENGTH);
// Read the end of the message
r.readListEnd();
// Reset the reader
r.removeConsumer(counting);
r.removeConsumer(copying);
// Build and return the unverified message
byte[] raw = copying.getCopy();
return new UnverifiedMessage(parent, group, author, contentType,
timestamp, raw, signature, bodyStart, body.length,
signedLength);
}
}

View File

@@ -0,0 +1,68 @@
package org.briarproject.sync;
import org.briarproject.api.Author;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.KeyParser;
import org.briarproject.api.crypto.MessageDigest;
import org.briarproject.api.crypto.PublicKey;
import org.briarproject.api.crypto.Signature;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.MessageVerifier;
import org.briarproject.api.sync.UnverifiedMessage;
import org.briarproject.api.system.Clock;
import java.security.GeneralSecurityException;
import java.util.logging.Logger;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static org.briarproject.api.transport.TransportConstants.MAX_CLOCK_DIFFERENCE;
class MessageVerifierImpl implements MessageVerifier {
private static final Logger LOG =
Logger.getLogger(MessageVerifierImpl.class.getName());
private final CryptoComponent crypto;
private final Clock clock;
private final KeyParser keyParser;
@Inject
MessageVerifierImpl(CryptoComponent crypto, Clock clock) {
this.crypto = crypto;
this.clock = clock;
keyParser = crypto.getSignatureKeyParser();
}
public Message verifyMessage(UnverifiedMessage m)
throws GeneralSecurityException {
long now = System.currentTimeMillis();
MessageDigest messageDigest = crypto.getMessageDigest();
Signature signature = crypto.getSignature();
// Reject the message if it's too far in the future
if (m.getTimestamp() > clock.currentTimeMillis() + MAX_CLOCK_DIFFERENCE)
throw new GeneralSecurityException();
// Hash the message to get the message ID
byte[] raw = m.getSerialised();
messageDigest.update(raw);
MessageId id = new MessageId(messageDigest.digest());
// Verify the author's signature, if there is one
Author author = m.getAuthor();
if (author != null) {
PublicKey k = keyParser.parsePublicKey(author.getPublicKey());
signature.initVerify(k);
signature.update(raw, 0, m.getSignedLength());
if (!signature.verify(m.getSignature()))
throw new GeneralSecurityException();
}
Message verified = new MessageImpl(id, m.getParent(), m.getGroup(),
author, m.getContentType(), m.getTimestamp(), raw,
m.getBodyStart(), m.getBodyLength());
long duration = System.currentTimeMillis() - now;
if (LOG.isLoggable(INFO))
LOG.info("Verifying message took " + duration + " ms");
return verified;
}
}

View File

@@ -0,0 +1,58 @@
package org.briarproject.sync;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorFactory;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupFactory;
import org.briarproject.api.sync.MessageFactory;
import org.briarproject.api.sync.MessageVerifier;
import org.briarproject.api.sync.MessagingSessionFactory;
import org.briarproject.api.sync.PacketReaderFactory;
import org.briarproject.api.sync.PacketWriterFactory;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.UnverifiedMessage;
import javax.inject.Singleton;
public class MessagingModule extends AbstractModule {
@Override
protected void configure() {
bind(AuthorFactory.class).to(AuthorFactoryImpl.class);
bind(GroupFactory.class).to(GroupFactoryImpl.class);
bind(MessageFactory.class).to(MessageFactoryImpl.class);
bind(MessageVerifier.class).to(MessageVerifierImpl.class);
bind(PacketReaderFactory.class).to(PacketReaderFactoryImpl.class);
bind(PacketWriterFactory.class).to(PacketWriterFactoryImpl.class);
bind(MessagingSessionFactory.class).to(
MessagingSessionFactoryImpl.class).in(Singleton.class);
}
@Provides
ObjectReader<Author> getAuthorReader(CryptoComponent crypto) {
return new AuthorReader(crypto);
}
@Provides
ObjectReader<Group> getGroupReader(CryptoComponent crypto) {
return new GroupReader(crypto);
}
@Provides
ObjectReader<UnverifiedMessage> getMessageReader(
ObjectReader<Group> groupReader,
ObjectReader<Author> authorReader) {
return new org.briarproject.sync.MessageReader(groupReader, authorReader);
}
@Provides
ObjectReader<SubscriptionUpdate> getSubscriptionUpdateReader(
ObjectReader<Group> groupReader) {
return new SubscriptionUpdateReader(groupReader);
}
}

View File

@@ -0,0 +1,71 @@
package org.briarproject.sync;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.crypto.CryptoExecutor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.sync.MessageVerifier;
import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.MessagingSessionFactory;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.PacketReaderFactory;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.PacketWriterFactory;
import org.briarproject.api.system.Clock;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import javax.inject.Inject;
class MessagingSessionFactoryImpl implements MessagingSessionFactory {
private final DatabaseComponent db;
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final EventBus eventBus;
private final Clock clock;
private final PacketReaderFactory packetReaderFactory;
private final PacketWriterFactory packetWriterFactory;
@Inject
MessagingSessionFactoryImpl(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor,
MessageVerifier messageVerifier, EventBus eventBus, Clock clock,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory) {
this.db = db;
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.eventBus = eventBus;
this.clock = clock;
this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory;
}
public MessagingSession createIncomingSession(ContactId c, TransportId t,
InputStream in) {
PacketReader packetReader = packetReaderFactory.createPacketReader(in);
return new IncomingSession(db, dbExecutor, cryptoExecutor, eventBus,
messageVerifier, c, t, packetReader);
}
public MessagingSession createSimplexOutgoingSession(ContactId c,
TransportId t, int maxLatency, OutputStream out) {
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
maxLatency, packetWriter);
}
public MessagingSession createDuplexOutgoingSession(ContactId c,
TransportId t, int maxLatency, int maxIdleTime, OutputStream out) {
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t,
maxLatency, maxIdleTime, packetWriter);
}
}

View File

@@ -0,0 +1,33 @@
package org.briarproject.sync;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.data.ReaderFactory;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.PacketReaderFactory;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.UnverifiedMessage;
import java.io.InputStream;
import javax.inject.Inject;
class PacketReaderFactoryImpl implements PacketReaderFactory {
private final ReaderFactory readerFactory;
private final ObjectReader<UnverifiedMessage> messageReader;
private final ObjectReader<SubscriptionUpdate> subscriptionUpdateReader;
@Inject
PacketReaderFactoryImpl(ReaderFactory readerFactory,
ObjectReader<UnverifiedMessage> messageReader,
ObjectReader<SubscriptionUpdate> subscriptionUpdateReader) {
this.readerFactory = readerFactory;
this.messageReader = messageReader;
this.subscriptionUpdateReader = subscriptionUpdateReader;
}
public PacketReader createPacketReader(InputStream in) {
return new PacketReaderImpl(readerFactory, messageReader,
subscriptionUpdateReader, in);
}
}

View File

@@ -0,0 +1,361 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.UniqueId;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.data.Reader;
import org.briarproject.api.data.ReaderFactory;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.sync.UnverifiedMessage;
import org.briarproject.util.ByteUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTIES_PER_TRANSPORT;
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTY_LENGTH;
import static org.briarproject.api.TransportPropertyConstants.MAX_TRANSPORT_ID_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.HEADER_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.PROTOCOL_VERSION;
import static org.briarproject.api.sync.PacketTypes.ACK;
import static org.briarproject.api.sync.PacketTypes.MESSAGE;
import static org.briarproject.api.sync.PacketTypes.OFFER;
import static org.briarproject.api.sync.PacketTypes.REQUEST;
import static org.briarproject.api.sync.PacketTypes.RETENTION_ACK;
import static org.briarproject.api.sync.PacketTypes.RETENTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_ACK;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_UPDATE;
// This class is not thread-safe
class PacketReaderImpl implements PacketReader {
private enum State { BUFFER_EMPTY, BUFFER_FULL, EOF }
private final ReaderFactory readerFactory;
private final ObjectReader<UnverifiedMessage> messageReader;
private final ObjectReader<SubscriptionUpdate> subscriptionUpdateReader;
private final InputStream in;
private final byte[] header, payload;
private State state = State.BUFFER_EMPTY;
private int payloadLength = 0;
PacketReaderImpl(ReaderFactory readerFactory,
ObjectReader<UnverifiedMessage> messageReader,
ObjectReader<SubscriptionUpdate> subscriptionUpdateReader,
InputStream in) {
this.readerFactory = readerFactory;
this.messageReader = messageReader;
this.subscriptionUpdateReader = subscriptionUpdateReader;
this.in = in;
header = new byte[HEADER_LENGTH];
payload = new byte[MAX_PAYLOAD_LENGTH];
}
private void readPacket() throws IOException {
assert state == State.BUFFER_EMPTY;
// Read the header
int offset = 0;
while (offset < HEADER_LENGTH) {
int read = in.read(header, offset, HEADER_LENGTH - offset);
if (read == -1) {
if (offset > 0) throw new FormatException();
state = State.EOF;
return;
}
offset += read;
}
// Check the protocol version
if (header[0] != PROTOCOL_VERSION) throw new FormatException();
// Read the payload length
payloadLength = ByteUtils.readUint16(header, 2);
if (payloadLength > MAX_PAYLOAD_LENGTH) throw new FormatException();
// Read the payload
offset = 0;
while (offset < payloadLength) {
int read = in.read(payload, offset, payloadLength - offset);
if (read == -1) throw new FormatException();
offset += read;
}
state = State.BUFFER_FULL;
}
public boolean eof() throws IOException {
if (state == State.BUFFER_EMPTY) readPacket();
assert state != State.BUFFER_EMPTY;
return state == State.EOF;
}
public boolean hasAck() throws IOException {
return !eof() && header[1] == ACK;
}
public Ack readAck() throws IOException {
if (!hasAck()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the message IDs
List<MessageId> acked = new ArrayList<MessageId>();
r.readListStart();
while (!r.hasListEnd()) {
byte[] b = r.readRaw(UniqueId.LENGTH);
if (b.length != UniqueId.LENGTH)
throw new FormatException();
acked.add(new MessageId(b));
}
if (acked.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the ack
return new Ack(Collections.unmodifiableList(acked));
}
public boolean hasMessage() throws IOException {
return !eof() && header[1] == MESSAGE;
}
public UnverifiedMessage readMessage() throws IOException {
if (!hasMessage()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read and build the message
UnverifiedMessage m = messageReader.readObject(r);
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
return m;
}
public boolean hasOffer() throws IOException {
return !eof() && header[1] == OFFER;
}
public Offer readOffer() throws IOException {
if (!hasOffer()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the message IDs
List<MessageId> offered = new ArrayList<MessageId>();
r.readListStart();
while (!r.hasListEnd()) {
byte[] b = r.readRaw(UniqueId.LENGTH);
if (b.length != UniqueId.LENGTH)
throw new FormatException();
offered.add(new MessageId(b));
}
if (offered.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the offer
return new Offer(Collections.unmodifiableList(offered));
}
public boolean hasRequest() throws IOException {
return !eof() && header[1] == REQUEST;
}
public Request readRequest() throws IOException {
if (!hasRequest()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the message IDs
r.readListStart();
List<MessageId> requested = new ArrayList<MessageId>();
while (!r.hasListEnd()) {
byte[] b = r.readRaw(UniqueId.LENGTH);
if (b.length != UniqueId.LENGTH)
throw new FormatException();
requested.add(new MessageId(b));
}
if (requested.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the request
return new Request(Collections.unmodifiableList(requested));
}
public boolean hasRetentionAck() throws IOException {
return !eof() && header[1] == RETENTION_ACK;
}
public RetentionAck readRetentionAck() throws IOException {
if (!hasRetentionAck()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the version
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the retention ack
return new RetentionAck(version);
}
public boolean hasRetentionUpdate() throws IOException {
return !eof() && header[1] == RETENTION_UPDATE;
}
public RetentionUpdate readRetentionUpdate() throws IOException {
if (!hasRetentionUpdate()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the retention time and version
long retention = r.readInteger();
if (retention < 0) throw new FormatException();
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the retention update
return new RetentionUpdate(retention, version);
}
public boolean hasSubscriptionAck() throws IOException {
return !eof() && header[1] == SUBSCRIPTION_ACK;
}
public SubscriptionAck readSubscriptionAck() throws IOException {
if (!hasSubscriptionAck()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the version
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the subscription ack
return new SubscriptionAck(version);
}
public boolean hasSubscriptionUpdate() throws IOException {
return !eof() && header[1] == SUBSCRIPTION_UPDATE;
}
public SubscriptionUpdate readSubscriptionUpdate() throws IOException {
if (!hasSubscriptionUpdate()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read and build the subscription update
SubscriptionUpdate u = subscriptionUpdateReader.readObject(r);
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
return u;
}
public boolean hasTransportAck() throws IOException {
return !eof() && header[1] == TRANSPORT_ACK;
}
public TransportAck readTransportAck() throws IOException {
if (!hasTransportAck()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the transport ID and version
String idString = r.readString(MAX_TRANSPORT_ID_LENGTH);
if (idString.length() == 0) throw new FormatException();
TransportId id = new TransportId(idString);
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the transport ack
return new TransportAck(id, version);
}
public boolean hasTransportUpdate() throws IOException {
return !eof() && header[1] == TRANSPORT_UPDATE;
}
public TransportUpdate readTransportUpdate() throws IOException {
if (!hasTransportUpdate()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the transport ID
String idString = r.readString(MAX_TRANSPORT_ID_LENGTH);
if (idString.length() == 0) throw new FormatException();
TransportId id = new TransportId(idString);
// Read the transport properties
Map<String, String> p = new HashMap<String, String>();
r.readMapStart();
for (int i = 0; !r.hasMapEnd(); i++) {
if (i == MAX_PROPERTIES_PER_TRANSPORT)
throw new FormatException();
String key = r.readString(MAX_PROPERTY_LENGTH);
String value = r.readString(MAX_PROPERTY_LENGTH);
p.put(key, value);
}
r.readMapEnd();
// Read the version number
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the transport update
return new TransportUpdate(id, new TransportProperties(p), version);
}
}

View File

@@ -0,0 +1,23 @@
package org.briarproject.sync;
import org.briarproject.api.data.WriterFactory;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.PacketWriterFactory;
import java.io.OutputStream;
import javax.inject.Inject;
class PacketWriterFactoryImpl implements PacketWriterFactory {
private final WriterFactory writerFactory;
@Inject
PacketWriterFactoryImpl(WriterFactory writerFactory) {
this.writerFactory = writerFactory;
}
public PacketWriter createPacketWriter(OutputStream out) {
return new PacketWriterImpl(writerFactory, out);
}
}

View File

@@ -0,0 +1,193 @@
package org.briarproject.sync;
import org.briarproject.api.data.Writer;
import org.briarproject.api.data.WriterFactory;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketTypes;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.util.ByteUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import static org.briarproject.api.data.DataConstants.LIST_END_LENGTH;
import static org.briarproject.api.data.DataConstants.LIST_START_LENGTH;
import static org.briarproject.api.data.DataConstants.UNIQUE_ID_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.HEADER_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.PROTOCOL_VERSION;
import static org.briarproject.api.sync.PacketTypes.ACK;
import static org.briarproject.api.sync.PacketTypes.OFFER;
import static org.briarproject.api.sync.PacketTypes.REQUEST;
import static org.briarproject.api.sync.PacketTypes.RETENTION_ACK;
import static org.briarproject.api.sync.PacketTypes.RETENTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_ACK;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_UPDATE;
// This class is not thread-safe
class PacketWriterImpl implements PacketWriter {
private final WriterFactory writerFactory;
private final OutputStream out;
private final byte[] header;
private final ByteArrayOutputStream payload;
PacketWriterImpl(WriterFactory writerFactory, OutputStream out) {
this.writerFactory = writerFactory;
this.out = out;
header = new byte[HEADER_LENGTH];
header[0] = PROTOCOL_VERSION;
payload = new ByteArrayOutputStream(MAX_PAYLOAD_LENGTH);
}
public int getMaxMessagesForAck(long capacity) {
return getMaxMessagesForPacket(capacity);
}
public int getMaxMessagesForRequest(long capacity) {
return getMaxMessagesForPacket(capacity);
}
public int getMaxMessagesForOffer(long capacity) {
return getMaxMessagesForPacket(capacity);
}
private int getMaxMessagesForPacket(long capacity) {
int payload = (int) Math.min(capacity - HEADER_LENGTH,
MAX_PAYLOAD_LENGTH);
int overhead = LIST_START_LENGTH * 2 + LIST_END_LENGTH * 2;
return (payload - overhead) / UNIQUE_ID_LENGTH;
}
private void writePacket(byte packetType) throws IOException {
header[1] = packetType;
ByteUtils.writeUint16(payload.size(), header, 2);
out.write(header);
payload.writeTo(out);
payload.reset();
}
public void writeAck(Ack a) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeListStart();
for (MessageId m : a.getMessageIds()) w.writeRaw(m.getBytes());
w.writeListEnd();
w.writeListEnd();
writePacket(ACK);
}
public void writeMessage(byte[] raw) throws IOException {
header[1] = PacketTypes.MESSAGE;
ByteUtils.writeUint16(raw.length, header, 2);
out.write(header);
out.write(raw);
}
public void writeOffer(Offer o) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeListStart();
for (MessageId m : o.getMessageIds()) w.writeRaw(m.getBytes());
w.writeListEnd();
w.writeListEnd();
writePacket(OFFER);
}
public void writeRequest(Request r) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeListStart();
for (MessageId m : r.getMessageIds()) w.writeRaw(m.getBytes());
w.writeListEnd();
w.writeListEnd();
writePacket(REQUEST);
}
public void writeRetentionAck(RetentionAck a) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeInteger(a.getVersion());
w.writeListEnd();
writePacket(RETENTION_ACK);
}
public void writeRetentionUpdate(RetentionUpdate u) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeInteger(u.getRetentionTime());
w.writeInteger(u.getVersion());
w.writeListEnd();
writePacket(RETENTION_UPDATE);
}
public void writeSubscriptionAck(SubscriptionAck a) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeInteger(a.getVersion());
w.writeListEnd();
writePacket(SUBSCRIPTION_ACK);
}
public void writeSubscriptionUpdate(SubscriptionUpdate u)
throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeListStart();
for (Group g : u.getGroups()) {
w.writeListStart();
w.writeString(g.getName());
w.writeRaw(g.getSalt());
w.writeListEnd();
}
w.writeListEnd();
w.writeInteger(u.getVersion());
w.writeListEnd();
writePacket(SUBSCRIPTION_UPDATE);
}
public void writeTransportAck(TransportAck a) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeString(a.getId().getString());
w.writeInteger(a.getVersion());
w.writeListEnd();
writePacket(TRANSPORT_ACK);
}
public void writeTransportUpdate(TransportUpdate u) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeString(u.getId().getString());
w.writeMap(u.getProperties());
w.writeInteger(u.getVersion());
w.writeListEnd();
writePacket(TRANSPORT_UPDATE);
}
public void flush() throws IOException {
out.flush();
}
}

View File

@@ -0,0 +1,22 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.Signature;
import org.briarproject.api.data.Consumer;
/** A consumer that passes its input through a signature. */
class SigningConsumer implements Consumer {
private final Signature signature;
public SigningConsumer(Signature signature) {
this.signature = signature;
}
public void write(byte b) {
signature.update(b);
}
public void write(byte[] b, int off, int len) {
signature.update(b, off, len);
}
}

View File

@@ -0,0 +1,418 @@
package org.briarproject.sync;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.ShutdownEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
/**
* An outgoing {@link MessagingSession
* MessagingSession} suitable for simplex transports. The session sends
* messages without offering them, and closes its output stream when there are
* no more packets to send.
*/
class SimplexOutgoingSession implements MessagingSession, EventListener {
private static final Logger LOG =
Logger.getLogger(SimplexOutgoingSession.class.getName());
private static final ThrowingRunnable<IOException> CLOSE =
new ThrowingRunnable<IOException>() {
public void run() {}
};
private final DatabaseComponent db;
private final Executor dbExecutor;
private final EventBus eventBus;
private final ContactId contactId;
private final TransportId transportId;
private final int maxLatency;
private final PacketWriter packetWriter;
private final AtomicInteger outstandingQueries;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
private volatile boolean interrupted = false;
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId, TransportId transportId,
int maxLatency, PacketWriter packetWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency;
this.packetWriter = packetWriter;
outstandingQueries = new AtomicInteger(8); // One per type of packet
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
}
public void run() throws IOException {
eventBus.addListener(this);
try {
// Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionAck());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
// Write packets until interrupted or no more packets to write
try {
while (!interrupted) {
ThrowingRunnable<IOException> task = writerTasks.take();
if (task == CLOSE) break;
task.run();
}
packetWriter.flush();
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write");
Thread.currentThread().interrupt();
}
} finally {
eventBus.removeListener(this);
}
}
public void interrupt() {
interrupted = true;
writerTasks.add(CLOSE);
}
private void decrementOutstandingQueries() {
if (outstandingQueries.decrementAndGet() == 0) writerTasks.add(CLOSE);
}
public void eventOccurred(Event e) {
if (e instanceof ContactRemovedEvent) {
ContactRemovedEvent c = (ContactRemovedEvent) e;
if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof ShutdownEvent) {
interrupt();
} else if (e instanceof TransportRemovedEvent) {
TransportRemovedEvent t = (TransportRemovedEvent) e;
if (t.getTransportId().equals(transportId)) interrupt();
}
}
// This task runs on the database thread
private class GenerateAck implements Runnable {
public void run() {
if (interrupted) return;
int maxMessages = packetWriter.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if (a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteAck implements ThrowingRunnable<IOException> {
private final Ack ack;
private WriteAck(Ack ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeAck(ack);
LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck());
}
}
// This task runs on the database thread
private class GenerateBatch implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<byte[]> b = db.generateBatch(contactId,
MAX_PAYLOAD_LENGTH, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b == null) decrementOutstandingQueries();
else writerTasks.add(new WriteBatch(b));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteBatch implements ThrowingRunnable<IOException> {
private final Collection<byte[]> batch;
private WriteBatch(Collection<byte[]> batch) {
this.batch = batch;
}
public void run() throws IOException {
if (interrupted) return;
for (byte[] raw : batch) packetWriter.writeMessage(raw);
LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch());
}
}
// This task runs on the database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
if (interrupted) return;
try {
RetentionAck a = db.generateRetentionAck(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated retention ack: " + (a != null));
if (a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteRetentionAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteRetentionAck implements ThrowingRunnable<IOException> {
private final RetentionAck ack;
private WriteRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeRetentionAck(ack);
LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck());
}
}
// This task runs on the database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
if (interrupted) return;
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated retention update: " + (u != null));
if (u == null) decrementOutstandingQueries();
else writerTasks.add(new WriteRetentionUpdate(u));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteRetentionUpdate
implements ThrowingRunnable<IOException> {
private final RetentionUpdate update;
private WriteRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeRetentionUpdate(update);
LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate());
}
}
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
if (interrupted) return;
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated subscription ack: " + (a != null));
if (a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteSubscriptionAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteSubscriptionAck
implements ThrowingRunnable<IOException> {
private final SubscriptionAck ack;
private WriteSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeSubscriptionAck(ack);
LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck());
}
}
// This task runs on the database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
if (interrupted) return;
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated subscription update: " + (u != null));
if (u == null) decrementOutstandingQueries();
else writerTasks.add(new WriteSubscriptionUpdate(u));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteSubscriptionUpdate
implements ThrowingRunnable<IOException> {
private final SubscriptionUpdate update;
private WriteSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeSubscriptionUpdate(update);
LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate());
}
}
// This task runs on the database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated transport acks: " + (acks != null));
if (acks == null) decrementOutstandingQueries();
else writerTasks.add(new WriteTransportAcks(acks));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteTransportAcks implements ThrowingRunnable<IOException> {
private final Collection<TransportAck> acks;
private WriteTransportAcks(Collection<TransportAck> acks) {
this.acks = acks;
}
public void run() throws IOException {
if (interrupted) return;
for (TransportAck a : acks) packetWriter.writeTransportAck(a);
LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks());
}
}
// This task runs on the database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated transport updates: " + (t != null));
if (t == null) decrementOutstandingQueries();
else writerTasks.add(new WriteTransportUpdates(t));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteTransportUpdates
implements ThrowingRunnable<IOException> {
private final Collection<TransportUpdate> updates;
private WriteTransportUpdates(Collection<TransportUpdate> updates) {
this.updates = updates;
}
public void run() throws IOException {
if (interrupted) return;
for (TransportUpdate u : updates)
packetWriter.writeTransportUpdate(u);
LOG.info("Sent transport updates");
dbExecutor.execute(new GenerateTransportUpdates());
}
}
}

View File

@@ -0,0 +1,56 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.data.Consumer;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.data.Reader;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.SubscriptionUpdate;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.MessagingConstants.MAX_SUBSCRIPTIONS;
class SubscriptionUpdateReader implements ObjectReader<SubscriptionUpdate> {
private final ObjectReader<Group> groupReader;
SubscriptionUpdateReader(ObjectReader<Group> groupReader) {
this.groupReader = groupReader;
}
public SubscriptionUpdate readObject(Reader r) throws IOException {
// Set up the reader
Consumer counting = new CountingConsumer(MAX_PAYLOAD_LENGTH);
r.addConsumer(counting);
// Read the start of the update
r.readListStart();
// Read the subscriptions, rejecting duplicates
List<Group> groups = new ArrayList<Group>();
Set<GroupId> ids = new HashSet<GroupId>();
r.readListStart();
for (int i = 0; i < MAX_SUBSCRIPTIONS && !r.hasListEnd(); i++) {
Group g = groupReader.readObject(r);
if (!ids.add(g.getId())) throw new FormatException(); // Duplicate
groups.add(g);
}
r.readListEnd();
// Read the version number
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the update
r.readListEnd();
// Reset the reader
r.removeConsumer(counting);
// Build and return the subscription update
groups = Collections.unmodifiableList(groups);
return new SubscriptionUpdate(groups, version);
}
}

View File

@@ -0,0 +1,6 @@
package org.briarproject.sync;
interface ThrowingRunnable<T extends Throwable> {
void run() throws T;
}