Merge branch '628-bring-protocols-into-line-with-spec' into 'master'

Bring protocols in line with spec

Closes #628

See merge request !465
This commit is contained in:
akwizgran
2016-12-21 12:52:43 +00:00
34 changed files with 373 additions and 320 deletions

View File

@@ -3,7 +3,7 @@ package org.briarproject.bramble.api.sync;
import java.util.Collection; import java.util.Collection;
/** /**
* A packet acknowledging receipt of one or more {@link Message Messages}. * A record acknowledging receipt of one or more {@link Message Messages}.
*/ */
public class Ack { public class Ack {

View File

@@ -1,5 +1,7 @@
package org.briarproject.bramble.api.sync; package org.briarproject.bramble.api.sync;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_GROUP_DESCRIPTOR_LENGTH;
public class Group { public class Group {
public enum Visibility { public enum Visibility {
@@ -13,6 +15,8 @@ public class Group {
private final byte[] descriptor; private final byte[] descriptor;
public Group(GroupId id, ClientId clientId, byte[] descriptor) { public Group(GroupId id, ClientId clientId, byte[] descriptor) {
if (descriptor.length > MAX_GROUP_DESCRIPTOR_LENGTH)
throw new IllegalArgumentException();
this.id = id; this.id = id;
this.clientId = clientId; this.clientId = clientId;
this.descriptor = descriptor; this.descriptor = descriptor;

View File

@@ -3,7 +3,7 @@ package org.briarproject.bramble.api.sync;
import java.util.Collection; import java.util.Collection;
/** /**
* A packet offering the recipient one or more {@link Message Messages}. * A record offering the recipient one or more {@link Message Messages}.
*/ */
public class Offer { public class Offer {

View File

@@ -5,7 +5,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.io.IOException; import java.io.IOException;
@NotNullByDefault @NotNullByDefault
public interface PacketReader { public interface RecordReader {
boolean eof() throws IOException; boolean eof() throws IOException;
@@ -24,4 +24,5 @@ public interface PacketReader {
boolean hasRequest() throws IOException; boolean hasRequest() throws IOException;
Request readRequest() throws IOException; Request readRequest() throws IOException;
} }

View File

@@ -5,7 +5,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.io.InputStream; import java.io.InputStream;
@NotNullByDefault @NotNullByDefault
public interface PacketReaderFactory { public interface RecordReaderFactory {
PacketReader createPacketReader(InputStream in); RecordReader createRecordReader(InputStream in);
} }

View File

@@ -1,12 +1,13 @@
package org.briarproject.bramble.api.sync; package org.briarproject.bramble.api.sync;
/** /**
* Packet types for the sync protocol. * Record types for the sync protocol.
*/ */
public interface PacketTypes { public interface RecordTypes {
byte ACK = 0; byte ACK = 0;
byte MESSAGE = 1; byte MESSAGE = 1;
byte OFFER = 2; byte OFFER = 2;
byte REQUEST = 3; byte REQUEST = 3;
} }

View File

@@ -5,7 +5,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.io.IOException; import java.io.IOException;
@NotNullByDefault @NotNullByDefault
public interface PacketWriter { public interface RecordWriter {
void writeAck(Ack a) throws IOException; void writeAck(Ack a) throws IOException;

View File

@@ -5,7 +5,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.io.OutputStream; import java.io.OutputStream;
@NotNullByDefault @NotNullByDefault
public interface PacketWriterFactory { public interface RecordWriterFactory {
PacketWriter createPacketWriter(OutputStream out); RecordWriter createRecordWriter(OutputStream out);
} }

View File

@@ -3,7 +3,7 @@ package org.briarproject.bramble.api.sync;
import java.util.Collection; import java.util.Collection;
/** /**
* A packet requesting one or more {@link Message Messages} from the recipient. * A record requesting one or more {@link Message Messages} from the recipient.
*/ */
public class Request { public class Request {

View File

@@ -10,19 +10,17 @@ public interface SyncConstants {
byte PROTOCOL_VERSION = 0; byte PROTOCOL_VERSION = 0;
/** /**
* The length of the packet header in bytes. * The length of the record header in bytes.
*/ */
int PACKET_HEADER_LENGTH = 4; int RECORD_HEADER_LENGTH = 4;
/** /**
* The maximum length of the packet payload in bytes. * The maximum length of the record payload in bytes.
*/ */
int MAX_PACKET_PAYLOAD_LENGTH = 32 * 1024; // 32 KiB int MAX_RECORD_PAYLOAD_LENGTH = 48 * 1024; // 48 KiB
/** /** The maximum length of a group descriptor in bytes. */
* The maximum length of a message in bytes. int MAX_GROUP_DESCRIPTOR_LENGTH = 16 * 1024; // 16 KiB
*/
int MAX_MESSAGE_LENGTH = MAX_PACKET_PAYLOAD_LENGTH - PACKET_HEADER_LENGTH;
/** /**
* The length of the message header in bytes. * The length of the message header in bytes.
@@ -32,10 +30,15 @@ public interface SyncConstants {
/** /**
* The maximum length of a message body in bytes. * The maximum length of a message body in bytes.
*/ */
int MAX_MESSAGE_BODY_LENGTH = MAX_MESSAGE_LENGTH - MESSAGE_HEADER_LENGTH; int MAX_MESSAGE_BODY_LENGTH = 32 * 1024; // 32 KiB
/** /**
* The maximum number of message IDs in an ack, offer or request packet. * The maximum length of a message in bytes.
*/ */
int MAX_MESSAGE_IDS = MAX_PACKET_PAYLOAD_LENGTH / UniqueId.LENGTH; int MAX_MESSAGE_LENGTH = MESSAGE_HEADER_LENGTH + MAX_MESSAGE_BODY_LENGTH;
/**
* The maximum number of message IDs in an ack, offer or request record.
*/
int MAX_MESSAGE_IDS = MAX_RECORD_PAYLOAD_LENGTH / UniqueId.LENGTH;
} }

View File

@@ -5,7 +5,7 @@ import java.io.IOException;
public interface SyncSession { public interface SyncSession {
/** /**
* Runs the session. This method returns when there are no more packets to * Runs the session. This method returns when there are no more records to
* send or receive, or when the {@link #interrupt()} method has been called. * send or receive, or when the {@link #interrupt()} method has been called.
*/ */
void run() throws IOException; void run() throws IOException;

View File

@@ -52,7 +52,7 @@ class KeyAgreementProtocol {
void connectionWaiting(); void connectionWaiting();
void initialPacketReceived(); void initialRecordReceived();
} }
private final Callbacks callbacks; private final Callbacks callbacks;
@@ -117,7 +117,7 @@ class KeyAgreementProtocol {
private byte[] receiveKey() throws AbortException { private byte[] receiveKey() throws AbortException {
byte[] publicKey = transport.receiveKey(); byte[] publicKey = transport.receiveKey();
callbacks.initialPacketReceived(); callbacks.initialRecordReceived();
byte[] expected = crypto.deriveKeyCommitment(publicKey); byte[] expected = crypto.deriveKeyCommitment(publicKey);
if (!Arrays.equals(expected, theirPayload.getCommitment())) if (!Arrays.equals(expected, theirPayload.getCommitment()))
throw new AbortException(); throw new AbortException();

View File

@@ -129,7 +129,7 @@ class KeyAgreementTaskImpl extends Thread implements
} }
@Override @Override
public void initialPacketReceived() { public void initialRecordReceived() {
// We send this here instead of when we create the protocol, so that // We send this here instead of when we create the protocol, so that
// if device A makes a connection after getting device B's payload and // if device A makes a connection after getting device B's payload and
// starts its protocol, device A's UI doesn't change to prevent device B // starts its protocol, device A's UI doesn't change to prevent device B

View File

@@ -95,20 +95,34 @@ class KeyAgreementTransport {
out.flush(); out.flush();
} }
private byte[] readRecord(byte type) throws AbortException { private byte[] readRecord(byte expectedType) throws AbortException {
byte[] header = readHeader(); while (true) {
if (header[0] != PROTOCOL_VERSION) byte[] header = readHeader();
throw new AbortException(); // TODO handle? int len = ByteUtils.readUint16(header,
if (header[1] != type) { RECORD_HEADER_PAYLOAD_LENGTH_OFFSET);
// Unexpected packet if (header[0] != PROTOCOL_VERSION) {
throw new AbortException(header[1] == ABORT); throw new AbortException(false);
} }
int len = ByteUtils.readUint16(header, byte type = header[1];
RECORD_HEADER_PAYLOAD_LENGTH_OFFSET); if (type == ABORT) throw new AbortException(true);
try { if (type != expectedType) {
return readData(len); if (type != KEY && type != CONFIRM) {
} catch (IOException e) { // ignore unrecognised record and try next
throw new AbortException(e); try {
readData(len);
} catch (IOException e) {
throw new AbortException(e);
}
continue;
} else {
throw new AbortException(false);
}
}
try {
return readData(len);
} catch (IOException e) {
throw new AbortException(e);
}
} }
} }

View File

@@ -14,7 +14,7 @@ import org.briarproject.bramble.api.lifecycle.event.ShutdownEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.PacketWriter; import org.briarproject.bramble.api.sync.RecordWriter;
import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent; import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
@@ -37,19 +37,19 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
/** /**
* An outgoing {@link SyncSession} suitable for duplex transports. The session * An outgoing {@link SyncSession} suitable for duplex transports. The session
* offers messages before sending them, keeps its output stream open when there * offers messages before sending them, keeps its output stream open when there
* are no packets to send, and reacts to events that make packets available to * are no records to send, and reacts to events that make records available to
* send. * send.
*/ */
@ThreadSafe @ThreadSafe
@NotNullByDefault @NotNullByDefault
class DuplexOutgoingSession implements SyncSession, EventListener { class DuplexOutgoingSession implements SyncSession, EventListener {
// Check for retransmittable packets once every 60 seconds // Check for retransmittable records once every 60 seconds
private static final int RETX_QUERY_INTERVAL = 60 * 1000; private static final int RETX_QUERY_INTERVAL = 60 * 1000;
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName()); Logger.getLogger(DuplexOutgoingSession.class.getName());
@@ -67,14 +67,14 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final Clock clock; private final Clock clock;
private final ContactId contactId; private final ContactId contactId;
private final int maxLatency, maxIdleTime; private final int maxLatency, maxIdleTime;
private final PacketWriter packetWriter; private final RecordWriter recordWriter;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency, EventBus eventBus, Clock clock, ContactId contactId, int maxLatency,
int maxIdleTime, PacketWriter packetWriter) { int maxIdleTime, RecordWriter recordWriter) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
@@ -82,7 +82,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
this.contactId = contactId; this.contactId = contactId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime; this.maxIdleTime = maxIdleTime;
this.packetWriter = packetWriter; this.recordWriter = recordWriter;
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>(); writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
} }
@@ -91,7 +91,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
public void run() throws IOException { public void run() throws IOException {
eventBus.addListener(this); eventBus.addListener(this);
try { try {
// Start a query for each type of packet // Start a query for each type of record
dbExecutor.execute(new GenerateAck()); dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
@@ -100,33 +100,33 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
long nextKeepalive = now + maxIdleTime; long nextKeepalive = now + maxIdleTime;
long nextRetxQuery = now + RETX_QUERY_INTERVAL; long nextRetxQuery = now + RETX_QUERY_INTERVAL;
boolean dataToFlush = true; boolean dataToFlush = true;
// Write packets until interrupted // Write records until interrupted
try { try {
while (!interrupted) { while (!interrupted) {
// Work out how long we should wait for a packet // Work out how long we should wait for a record
now = clock.currentTimeMillis(); now = clock.currentTimeMillis();
long wait = Math.min(nextKeepalive, nextRetxQuery) - now; long wait = Math.min(nextKeepalive, nextRetxQuery) - now;
if (wait < 0) wait = 0; if (wait < 0) wait = 0;
// Flush any unflushed data if we're going to wait // Flush any unflushed data if we're going to wait
if (wait > 0 && dataToFlush && writerTasks.isEmpty()) { if (wait > 0 && dataToFlush && writerTasks.isEmpty()) {
packetWriter.flush(); recordWriter.flush();
dataToFlush = false; dataToFlush = false;
nextKeepalive = now + maxIdleTime; nextKeepalive = now + maxIdleTime;
} }
// Wait for a packet // Wait for a record
ThrowingRunnable<IOException> task = writerTasks.poll(wait, ThrowingRunnable<IOException> task = writerTasks.poll(wait,
MILLISECONDS); MILLISECONDS);
if (task == null) { if (task == null) {
now = clock.currentTimeMillis(); now = clock.currentTimeMillis();
if (now >= nextRetxQuery) { if (now >= nextRetxQuery) {
// Check for retransmittable packets // Check for retransmittable records
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
nextRetxQuery = now + RETX_QUERY_INTERVAL; nextRetxQuery = now + RETX_QUERY_INTERVAL;
} }
if (now >= nextKeepalive) { if (now >= nextKeepalive) {
// Flush the stream to keep it alive // Flush the stream to keep it alive
packetWriter.flush(); recordWriter.flush();
dataToFlush = false; dataToFlush = false;
nextKeepalive = now + maxIdleTime; nextKeepalive = now + maxIdleTime;
} }
@@ -137,9 +137,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dataToFlush = true; dataToFlush = true;
} }
} }
if (dataToFlush) packetWriter.flush(); if (dataToFlush) recordWriter.flush();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write"); LOG.info("Interrupted while waiting for a record to write");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} finally { } finally {
@@ -215,7 +215,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() throws IOException { public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
packetWriter.writeAck(ack); recordWriter.writeAck(ack);
LOG.info("Sent ack"); LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck()); dbExecutor.execute(new GenerateAck());
} }
@@ -232,7 +232,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
Transaction txn = db.startTransaction(false); Transaction txn = db.startTransaction(false);
try { try {
b = db.generateRequestedBatch(txn, contactId, b = db.generateRequestedBatch(txn, contactId,
MAX_PACKET_PAYLOAD_LENGTH, maxLatency); MAX_RECORD_PAYLOAD_LENGTH, maxLatency);
db.commitTransaction(txn); db.commitTransaction(txn);
} finally { } finally {
db.endTransaction(txn); db.endTransaction(txn);
@@ -259,7 +259,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() throws IOException { public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
for (byte[] raw : batch) packetWriter.writeMessage(raw); for (byte[] raw : batch) recordWriter.writeMessage(raw);
LOG.info("Sent batch"); LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateBatch());
} }
@@ -303,7 +303,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() throws IOException { public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
packetWriter.writeOffer(offer); recordWriter.writeOffer(offer);
LOG.info("Sent offer"); LOG.info("Sent offer");
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
} }
@@ -346,7 +346,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() throws IOException { public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
packetWriter.writeRequest(request); recordWriter.writeRequest(request);
LOG.info("Sent request"); LOG.info("Sent request");
dbExecutor.execute(new GenerateRequest()); dbExecutor.execute(new GenerateRequest());
} }

View File

@@ -16,7 +16,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.PacketReader; import org.briarproject.bramble.api.sync.RecordReader;
import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
@@ -42,18 +42,18 @@ class IncomingSession implements SyncSession, EventListener {
private final Executor dbExecutor; private final Executor dbExecutor;
private final EventBus eventBus; private final EventBus eventBus;
private final ContactId contactId; private final ContactId contactId;
private final PacketReader packetReader; private final RecordReader recordReader;
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
IncomingSession(DatabaseComponent db, Executor dbExecutor, IncomingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId, EventBus eventBus, ContactId contactId,
PacketReader packetReader) { RecordReader recordReader) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.contactId = contactId; this.contactId = contactId;
this.packetReader = packetReader; this.recordReader = recordReader;
} }
@IoExecutor @IoExecutor
@@ -61,21 +61,22 @@ class IncomingSession implements SyncSession, EventListener {
public void run() throws IOException { public void run() throws IOException {
eventBus.addListener(this); eventBus.addListener(this);
try { try {
// Read packets until interrupted or EOF // Read records until interrupted or EOF
while (!interrupted && !packetReader.eof()) { while (!interrupted && !recordReader.eof()) {
if (packetReader.hasAck()) { if (recordReader.hasAck()) {
Ack a = packetReader.readAck(); Ack a = recordReader.readAck();
dbExecutor.execute(new ReceiveAck(a)); dbExecutor.execute(new ReceiveAck(a));
} else if (packetReader.hasMessage()) { } else if (recordReader.hasMessage()) {
Message m = packetReader.readMessage(); Message m = recordReader.readMessage();
dbExecutor.execute(new ReceiveMessage(m)); dbExecutor.execute(new ReceiveMessage(m));
} else if (packetReader.hasOffer()) { } else if (recordReader.hasOffer()) {
Offer o = packetReader.readOffer(); Offer o = recordReader.readOffer();
dbExecutor.execute(new ReceiveOffer(o)); dbExecutor.execute(new ReceiveOffer(o));
} else if (packetReader.hasRequest()) { } else if (recordReader.hasRequest()) {
Request r = packetReader.readRequest(); Request r = recordReader.readRequest();
dbExecutor.execute(new ReceiveRequest(r)); dbExecutor.execute(new ReceiveRequest(r));
} else { } else {
// unknown records are ignored in RecordReader#eof()
throw new FormatException(); throw new FormatException();
} }
} }

View File

@@ -30,11 +30,15 @@ class MessageFactoryImpl implements MessageFactory {
public Message createMessage(GroupId g, long timestamp, byte[] body) { public Message createMessage(GroupId g, long timestamp, byte[] body) {
if (body.length > MAX_MESSAGE_BODY_LENGTH) if (body.length > MAX_MESSAGE_BODY_LENGTH)
throw new IllegalArgumentException(); throw new IllegalArgumentException();
byte[] timeBytes = new byte[ByteUtils.INT_64_BYTES];
ByteUtils.writeUint64(timestamp, timeBytes, 0);
byte[] idHash =
crypto.hash(MessageId.LABEL, g.getBytes(), timeBytes, body);
MessageId id = new MessageId(idHash);
byte[] raw = new byte[MESSAGE_HEADER_LENGTH + body.length]; byte[] raw = new byte[MESSAGE_HEADER_LENGTH + body.length];
System.arraycopy(g.getBytes(), 0, raw, 0, UniqueId.LENGTH); System.arraycopy(g.getBytes(), 0, raw, 0, UniqueId.LENGTH);
ByteUtils.writeUint64(timestamp, raw, UniqueId.LENGTH); ByteUtils.writeUint64(timestamp, raw, UniqueId.LENGTH);
System.arraycopy(body, 0, raw, MESSAGE_HEADER_LENGTH, body.length); System.arraycopy(body, 0, raw, MESSAGE_HEADER_LENGTH, body.length);
MessageId id = new MessageId(crypto.hash(MessageId.LABEL, raw));
return new Message(id, g, timestamp, raw); return new Message(id, g, timestamp, raw);
} }

View File

@@ -1,28 +0,0 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.crypto.CryptoComponent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.PacketReader;
import org.briarproject.bramble.api.sync.PacketReaderFactory;
import java.io.InputStream;
import javax.annotation.concurrent.Immutable;
import javax.inject.Inject;
@Immutable
@NotNullByDefault
class PacketReaderFactoryImpl implements PacketReaderFactory {
private final CryptoComponent crypto;
@Inject
PacketReaderFactoryImpl(CryptoComponent crypto) {
this.crypto = crypto;
}
@Override
public PacketReader createPacketReader(InputStream in) {
return new PacketReaderImpl(crypto, in);
}
}

View File

@@ -1,16 +0,0 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.PacketWriter;
import org.briarproject.bramble.api.sync.PacketWriterFactory;
import java.io.OutputStream;
@NotNullByDefault
class PacketWriterFactoryImpl implements PacketWriterFactory {
@Override
public PacketWriter createPacketWriter(OutputStream out) {
return new PacketWriterImpl(out);
}
}

View File

@@ -0,0 +1,28 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.RecordReader;
import org.briarproject.bramble.api.sync.RecordReaderFactory;
import java.io.InputStream;
import javax.annotation.concurrent.Immutable;
import javax.inject.Inject;
@Immutable
@NotNullByDefault
class RecordReaderFactoryImpl implements RecordReaderFactory {
private final MessageFactory messageFactory;
@Inject
RecordReaderFactoryImpl(MessageFactory messageFactory) {
this.messageFactory = messageFactory;
}
@Override
public RecordReader createRecordReader(InputStream in) {
return new RecordReaderImpl(messageFactory, in);
}
}

View File

@@ -2,14 +2,14 @@ package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.FormatException; import org.briarproject.bramble.api.FormatException;
import org.briarproject.bramble.api.UniqueId; import org.briarproject.bramble.api.UniqueId;
import org.briarproject.bramble.api.crypto.CryptoComponent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.GroupId; import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.PacketReader; import org.briarproject.bramble.api.sync.RecordReader;
import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.util.ByteUtils; import org.briarproject.bramble.util.ByteUtils;
@@ -20,66 +20,83 @@ import java.util.List;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
import static org.briarproject.bramble.api.sync.PacketTypes.ACK; import static org.briarproject.bramble.api.sync.RecordTypes.ACK;
import static org.briarproject.bramble.api.sync.PacketTypes.MESSAGE; import static org.briarproject.bramble.api.sync.RecordTypes.MESSAGE;
import static org.briarproject.bramble.api.sync.PacketTypes.OFFER; import static org.briarproject.bramble.api.sync.RecordTypes.OFFER;
import static org.briarproject.bramble.api.sync.PacketTypes.REQUEST; import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.PACKET_HEADER_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION; import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION;
import static org.briarproject.bramble.api.sync.SyncConstants.RECORD_HEADER_LENGTH;
@NotThreadSafe @NotThreadSafe
@NotNullByDefault @NotNullByDefault
class PacketReaderImpl implements PacketReader { class RecordReaderImpl implements RecordReader {
private enum State { BUFFER_EMPTY, BUFFER_FULL, EOF } private enum State { BUFFER_EMPTY, BUFFER_FULL, EOF }
private final CryptoComponent crypto; private final MessageFactory messageFactory;
private final InputStream in; private final InputStream in;
private final byte[] header, payload; private final byte[] header, payload;
private State state = State.BUFFER_EMPTY; private State state = State.BUFFER_EMPTY;
private int payloadLength = 0; private int payloadLength = 0;
PacketReaderImpl(CryptoComponent crypto, InputStream in) { RecordReaderImpl(MessageFactory messageFactory, InputStream in) {
this.crypto = crypto; this.messageFactory = messageFactory;
this.in = in; this.in = in;
header = new byte[PACKET_HEADER_LENGTH]; header = new byte[RECORD_HEADER_LENGTH];
payload = new byte[MAX_PACKET_PAYLOAD_LENGTH]; payload = new byte[MAX_RECORD_PAYLOAD_LENGTH];
} }
private void readPacket() throws IOException { private void readRecord() throws IOException {
if (state != State.BUFFER_EMPTY) throw new IllegalStateException(); if (state != State.BUFFER_EMPTY) throw new IllegalStateException();
// Read the header while (true) {
int offset = 0; // Read the header
while (offset < PACKET_HEADER_LENGTH) { int offset = 0;
int read = in.read(header, offset, PACKET_HEADER_LENGTH - offset); while (offset < RECORD_HEADER_LENGTH) {
if (read == -1) { int read =
if (offset > 0) throw new FormatException(); in.read(header, offset, RECORD_HEADER_LENGTH - offset);
state = State.EOF; if (read == -1) {
if (offset > 0) throw new FormatException();
state = State.EOF;
return;
}
offset += read;
}
// Check the protocol version
if (header[0] != PROTOCOL_VERSION) throw new FormatException();
// Read the payload length
payloadLength = ByteUtils.readUint16(header, 2);
if (payloadLength > MAX_RECORD_PAYLOAD_LENGTH)
throw new FormatException();
// Read the payload
offset = 0;
while (offset < payloadLength) {
int read = in.read(payload, offset, payloadLength - offset);
if (read == -1) throw new FormatException();
offset += read;
}
state = State.BUFFER_FULL;
// Return if this is a known record type
if (header[1] == ACK || header[1] == MESSAGE ||
header[1] == OFFER || header[1] == REQUEST) {
return; return;
} }
offset += read;
} }
// Check the protocol version
if (header[0] != PROTOCOL_VERSION) throw new FormatException();
// Read the payload length
payloadLength = ByteUtils.readUint16(header, 2);
if (payloadLength > MAX_PACKET_PAYLOAD_LENGTH) throw new FormatException();
// Read the payload
offset = 0;
while (offset < payloadLength) {
int read = in.read(payload, offset, payloadLength - offset);
if (read == -1) throw new FormatException();
offset += read;
}
state = State.BUFFER_FULL;
} }
/**
* The return value indicates whether there's another record available
* or whether we've reached the end of the input stream.
* If a record is available,
* it's been read into the buffer by the time eof() returns,
* so the method that called eof() can access the record from the buffer,
* for example to check its type or extract its payload.
*/
@Override @Override
public boolean eof() throws IOException { public boolean eof() throws IOException {
if (state == State.BUFFER_EMPTY) readPacket(); if (state == State.BUFFER_EMPTY) readRecord();
if (state == State.BUFFER_EMPTY) throw new IllegalStateException(); if (state == State.BUFFER_EMPTY) throw new IllegalStateException();
return state == State.EOF; return state == State.EOF;
} }
@@ -124,13 +141,12 @@ class PacketReaderImpl implements PacketReader {
// Timestamp // Timestamp
long timestamp = ByteUtils.readUint64(payload, UniqueId.LENGTH); long timestamp = ByteUtils.readUint64(payload, UniqueId.LENGTH);
if (timestamp < 0) throw new FormatException(); if (timestamp < 0) throw new FormatException();
// Raw message // Body
byte[] raw = new byte[payloadLength]; byte[] body = new byte[payloadLength - MESSAGE_HEADER_LENGTH];
System.arraycopy(payload, 0, raw, 0, payloadLength); System.arraycopy(payload, MESSAGE_HEADER_LENGTH, body, 0,
payloadLength - MESSAGE_HEADER_LENGTH);
state = State.BUFFER_EMPTY; state = State.BUFFER_EMPTY;
// Message ID return messageFactory.createMessage(groupId, timestamp, body);
MessageId messageId = new MessageId(crypto.hash(MessageId.LABEL, raw));
return new Message(messageId, groupId, timestamp, raw);
} }
@Override @Override
@@ -154,4 +170,5 @@ class PacketReaderImpl implements PacketReader {
if (!hasRequest()) throw new FormatException(); if (!hasRequest()) throw new FormatException();
return new Request(readMessageIds()); return new Request(readMessageIds());
} }
} }

View File

@@ -0,0 +1,16 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.RecordWriter;
import org.briarproject.bramble.api.sync.RecordWriterFactory;
import java.io.OutputStream;
@NotNullByDefault
class RecordWriterFactoryImpl implements RecordWriterFactory {
@Override
public RecordWriter createRecordWriter(OutputStream out) {
return new RecordWriterImpl(out);
}
}

View File

@@ -4,8 +4,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.PacketTypes; import org.briarproject.bramble.api.sync.RecordTypes;
import org.briarproject.bramble.api.sync.PacketWriter; import org.briarproject.bramble.api.sync.RecordWriter;
import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.util.ByteUtils; import org.briarproject.bramble.util.ByteUtils;
@@ -15,30 +15,30 @@ import java.io.OutputStream;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
import static org.briarproject.bramble.api.sync.PacketTypes.ACK; import static org.briarproject.bramble.api.sync.RecordTypes.ACK;
import static org.briarproject.bramble.api.sync.PacketTypes.OFFER; import static org.briarproject.bramble.api.sync.RecordTypes.OFFER;
import static org.briarproject.bramble.api.sync.PacketTypes.REQUEST; import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.PACKET_HEADER_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.RECORD_HEADER_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION; import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION;
@NotThreadSafe @NotThreadSafe
@NotNullByDefault @NotNullByDefault
class PacketWriterImpl implements PacketWriter { class RecordWriterImpl implements RecordWriter {
private final OutputStream out; private final OutputStream out;
private final byte[] header; private final byte[] header;
private final ByteArrayOutputStream payload; private final ByteArrayOutputStream payload;
PacketWriterImpl(OutputStream out) { RecordWriterImpl(OutputStream out) {
this.out = out; this.out = out;
header = new byte[PACKET_HEADER_LENGTH]; header = new byte[RECORD_HEADER_LENGTH];
header[0] = PROTOCOL_VERSION; header[0] = PROTOCOL_VERSION;
payload = new ByteArrayOutputStream(MAX_PACKET_PAYLOAD_LENGTH); payload = new ByteArrayOutputStream(MAX_RECORD_PAYLOAD_LENGTH);
} }
private void writePacket(byte packetType) throws IOException { private void writeRecord(byte recordType) throws IOException {
header[1] = packetType; header[1] = recordType;
ByteUtils.writeUint16(payload.size(), header, 2); ByteUtils.writeUint16(payload.size(), header, 2);
out.write(header); out.write(header);
payload.writeTo(out); payload.writeTo(out);
@@ -49,12 +49,12 @@ class PacketWriterImpl implements PacketWriter {
public void writeAck(Ack a) throws IOException { public void writeAck(Ack a) throws IOException {
if (payload.size() != 0) throw new IllegalStateException(); if (payload.size() != 0) throw new IllegalStateException();
for (MessageId m : a.getMessageIds()) payload.write(m.getBytes()); for (MessageId m : a.getMessageIds()) payload.write(m.getBytes());
writePacket(ACK); writeRecord(ACK);
} }
@Override @Override
public void writeMessage(byte[] raw) throws IOException { public void writeMessage(byte[] raw) throws IOException {
header[1] = PacketTypes.MESSAGE; header[1] = RecordTypes.MESSAGE;
ByteUtils.writeUint16(raw.length, header, 2); ByteUtils.writeUint16(raw.length, header, 2);
out.write(header); out.write(header);
out.write(raw); out.write(raw);
@@ -64,14 +64,14 @@ class PacketWriterImpl implements PacketWriter {
public void writeOffer(Offer o) throws IOException { public void writeOffer(Offer o) throws IOException {
if (payload.size() != 0) throw new IllegalStateException(); if (payload.size() != 0) throw new IllegalStateException();
for (MessageId m : o.getMessageIds()) payload.write(m.getBytes()); for (MessageId m : o.getMessageIds()) payload.write(m.getBytes());
writePacket(OFFER); writeRecord(OFFER);
} }
@Override @Override
public void writeRequest(Request r) throws IOException { public void writeRequest(Request r) throws IOException {
if (payload.size() != 0) throw new IllegalStateException(); if (payload.size() != 0) throw new IllegalStateException();
for (MessageId m : r.getMessageIds()) payload.write(m.getBytes()); for (MessageId m : r.getMessageIds()) payload.write(m.getBytes());
writePacket(REQUEST); writeRecord(REQUEST);
} }
@Override @Override

View File

@@ -13,7 +13,7 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.ShutdownEvent; import org.briarproject.bramble.api.lifecycle.event.ShutdownEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.PacketWriter; import org.briarproject.bramble.api.sync.RecordWriter;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
import java.io.IOException; import java.io.IOException;
@@ -29,12 +29,12 @@ import javax.annotation.concurrent.ThreadSafe;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
/** /**
* An outgoing {@link SyncSession} suitable for simplex transports. The session * An outgoing {@link SyncSession} suitable for simplex transports. The session
* sends messages without offering them first, and closes its output stream * sends messages without offering them first, and closes its output stream
* when there are no more packets to send. * when there are no more records to send.
*/ */
@ThreadSafe @ThreadSafe
@NotNullByDefault @NotNullByDefault
@@ -55,7 +55,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private final EventBus eventBus; private final EventBus eventBus;
private final ContactId contactId; private final ContactId contactId;
private final int maxLatency; private final int maxLatency;
private final PacketWriter packetWriter; private final RecordWriter recordWriter;
private final AtomicInteger outstandingQueries; private final AtomicInteger outstandingQueries;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
@@ -63,14 +63,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId, EventBus eventBus, ContactId contactId,
int maxLatency, PacketWriter packetWriter) { int maxLatency, RecordWriter recordWriter) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.contactId = contactId; this.contactId = contactId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.packetWriter = packetWriter; this.recordWriter = recordWriter;
outstandingQueries = new AtomicInteger(2); // One per type of packet outstandingQueries = new AtomicInteger(2); // One per type of record
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>(); writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
} }
@@ -79,19 +79,19 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
public void run() throws IOException { public void run() throws IOException {
eventBus.addListener(this); eventBus.addListener(this);
try { try {
// Start a query for each type of packet // Start a query for each type of record
dbExecutor.execute(new GenerateAck()); dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateBatch());
// Write packets until interrupted or no more packets to write // Write records until interrupted or no more records to write
try { try {
while (!interrupted) { while (!interrupted) {
ThrowingRunnable<IOException> task = writerTasks.take(); ThrowingRunnable<IOException> task = writerTasks.take();
if (task == CLOSE) break; if (task == CLOSE) break;
task.run(); task.run();
} }
packetWriter.flush(); recordWriter.flush();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write"); LOG.info("Interrupted while waiting for a record to write");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} finally { } finally {
@@ -157,7 +157,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() throws IOException { public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
packetWriter.writeAck(ack); recordWriter.writeAck(ack);
LOG.info("Sent ack"); LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck()); dbExecutor.execute(new GenerateAck());
} }
@@ -174,7 +174,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
Transaction txn = db.startTransaction(false); Transaction txn = db.startTransaction(false);
try { try {
b = db.generateBatch(txn, contactId, b = db.generateBatch(txn, contactId,
MAX_PACKET_PAYLOAD_LENGTH, maxLatency); MAX_RECORD_PAYLOAD_LENGTH, maxLatency);
db.commitTransaction(txn); db.commitTransaction(txn);
} finally { } finally {
db.endTransaction(txn); db.endTransaction(txn);
@@ -202,7 +202,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() throws IOException { public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
for (byte[] raw : batch) packetWriter.writeMessage(raw); for (byte[] raw : batch) recordWriter.writeMessage(raw);
LOG.info("Sent batch"); LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateBatch());
} }

View File

@@ -7,8 +7,8 @@ import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import org.briarproject.bramble.api.sync.GroupFactory; import org.briarproject.bramble.api.sync.GroupFactory;
import org.briarproject.bramble.api.sync.MessageFactory; import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.PacketReaderFactory; import org.briarproject.bramble.api.sync.RecordReaderFactory;
import org.briarproject.bramble.api.sync.PacketWriterFactory; import org.briarproject.bramble.api.sync.RecordWriterFactory;
import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.sync.ValidationManager; import org.briarproject.bramble.api.sync.ValidationManager;
import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.api.system.Clock;
@@ -40,23 +40,24 @@ public class SyncModule {
} }
@Provides @Provides
PacketReaderFactory providePacketReaderFactory(CryptoComponent crypto) { RecordReaderFactory provideRecordReaderFactory(
return new PacketReaderFactoryImpl(crypto); RecordReaderFactoryImpl recordReaderFactory) {
return recordReaderFactory;
} }
@Provides @Provides
PacketWriterFactory providePacketWriterFactory() { RecordWriterFactory provideRecordWriterFactory() {
return new PacketWriterFactoryImpl(); return new RecordWriterFactoryImpl();
} }
@Provides @Provides
@Singleton @Singleton
SyncSessionFactory provideSyncSessionFactory(DatabaseComponent db, SyncSessionFactory provideSyncSessionFactory(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor, EventBus eventBus, @DatabaseExecutor Executor dbExecutor, EventBus eventBus,
Clock clock, PacketReaderFactory packetReaderFactory, Clock clock, RecordReaderFactory recordReaderFactory,
PacketWriterFactory packetWriterFactory) { RecordWriterFactory recordWriterFactory) {
return new SyncSessionFactoryImpl(db, dbExecutor, eventBus, clock, return new SyncSessionFactoryImpl(db, dbExecutor, eventBus, clock,
packetReaderFactory, packetWriterFactory); recordReaderFactory, recordWriterFactory);
} }
@Provides @Provides

View File

@@ -5,10 +5,10 @@ import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.PacketReader; import org.briarproject.bramble.api.sync.RecordReader;
import org.briarproject.bramble.api.sync.PacketReaderFactory; import org.briarproject.bramble.api.sync.RecordReaderFactory;
import org.briarproject.bramble.api.sync.PacketWriter; import org.briarproject.bramble.api.sync.RecordWriter;
import org.briarproject.bramble.api.sync.PacketWriterFactory; import org.briarproject.bramble.api.sync.RecordWriterFactory;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.api.system.Clock;
@@ -28,41 +28,41 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
private final Executor dbExecutor; private final Executor dbExecutor;
private final EventBus eventBus; private final EventBus eventBus;
private final Clock clock; private final Clock clock;
private final PacketReaderFactory packetReaderFactory; private final RecordReaderFactory recordReaderFactory;
private final PacketWriterFactory packetWriterFactory; private final RecordWriterFactory recordWriterFactory;
@Inject @Inject
SyncSessionFactoryImpl(DatabaseComponent db, SyncSessionFactoryImpl(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor, EventBus eventBus, @DatabaseExecutor Executor dbExecutor, EventBus eventBus,
Clock clock, PacketReaderFactory packetReaderFactory, Clock clock, RecordReaderFactory recordReaderFactory,
PacketWriterFactory packetWriterFactory) { RecordWriterFactory recordWriterFactory) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.clock = clock; this.clock = clock;
this.packetReaderFactory = packetReaderFactory; this.recordReaderFactory = recordReaderFactory;
this.packetWriterFactory = packetWriterFactory; this.recordWriterFactory = recordWriterFactory;
} }
@Override @Override
public SyncSession createIncomingSession(ContactId c, InputStream in) { public SyncSession createIncomingSession(ContactId c, InputStream in) {
PacketReader packetReader = packetReaderFactory.createPacketReader(in); RecordReader recordReader = recordReaderFactory.createRecordReader(in);
return new IncomingSession(db, dbExecutor, eventBus, c, packetReader); return new IncomingSession(db, dbExecutor, eventBus, c, recordReader);
} }
@Override @Override
public SyncSession createSimplexOutgoingSession(ContactId c, public SyncSession createSimplexOutgoingSession(ContactId c,
int maxLatency, OutputStream out) { int maxLatency, OutputStream out) {
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out); RecordWriter recordWriter = recordWriterFactory.createRecordWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, return new SimplexOutgoingSession(db, dbExecutor, eventBus, c,
maxLatency, packetWriter); maxLatency, recordWriter);
} }
@Override @Override
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
int maxIdleTime, OutputStream out) { int maxIdleTime, OutputStream out) {
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out); RecordWriter recordWriter = recordWriterFactory.createRecordWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c,
maxLatency, maxIdleTime, packetWriter); maxLatency, maxIdleTime, recordWriter);
} }
} }

View File

@@ -63,6 +63,7 @@ import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_PUBLIC_K
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_GROUP_DESCRIPTOR_LENGTH;
import static org.briarproject.bramble.api.sync.ValidationManager.State.DELIVERED; import static org.briarproject.bramble.api.sync.ValidationManager.State.DELIVERED;
import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN; import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN;
import static org.briarproject.bramble.api.transport.TransportConstants.REORDERING_WINDOW_SIZE; import static org.briarproject.bramble.api.transport.TransportConstants.REORDERING_WINDOW_SIZE;
@@ -95,7 +96,7 @@ public class DatabaseComponentImplTest extends BrambleTestCase {
public DatabaseComponentImplTest() { public DatabaseComponentImplTest() {
clientId = new ClientId(TestUtils.getRandomString(5)); clientId = new ClientId(TestUtils.getRandomString(5));
groupId = new GroupId(TestUtils.getRandomId()); groupId = new GroupId(TestUtils.getRandomId());
byte[] descriptor = new byte[0]; byte[] descriptor = new byte[MAX_GROUP_DESCRIPTOR_LENGTH];
group = new Group(groupId, clientId, descriptor); group = new Group(groupId, clientId, descriptor);
authorId = new AuthorId(TestUtils.getRandomId()); authorId = new AuthorId(TestUtils.getRandomId());
author = new Author(authorId, "Alice", new byte[MAX_PUBLIC_KEY_LENGTH]); author = new Author(authorId, "Alice", new byte[MAX_PUBLIC_KEY_LENGTH]);

View File

@@ -47,6 +47,7 @@ import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_PUBLIC_K
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_GROUP_DESCRIPTOR_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
import static org.briarproject.bramble.api.sync.ValidationManager.State.DELIVERED; import static org.briarproject.bramble.api.sync.ValidationManager.State.DELIVERED;
import static org.briarproject.bramble.api.sync.ValidationManager.State.INVALID; import static org.briarproject.bramble.api.sync.ValidationManager.State.INVALID;
@@ -84,7 +85,7 @@ public class H2DatabaseTest extends BrambleTestCase {
public H2DatabaseTest() throws Exception { public H2DatabaseTest() throws Exception {
groupId = new GroupId(TestUtils.getRandomId()); groupId = new GroupId(TestUtils.getRandomId());
clientId = new ClientId(TestUtils.getRandomString(5)); clientId = new ClientId(TestUtils.getRandomString(5));
byte[] descriptor = new byte[0]; byte[] descriptor = new byte[MAX_GROUP_DESCRIPTOR_LENGTH];
group = new Group(groupId, clientId, descriptor); group = new Group(groupId, clientId, descriptor);
AuthorId authorId = new AuthorId(TestUtils.getRandomId()); AuthorId authorId = new AuthorId(TestUtils.getRandomId());
author = new Author(authorId, "Alice", new byte[MAX_PUBLIC_KEY_LENGTH]); author = new Author(authorId, "Alice", new byte[MAX_PUBLIC_KEY_LENGTH]);
@@ -1316,7 +1317,7 @@ public class H2DatabaseTest extends BrambleTestCase {
// Add a second group // Add a second group
GroupId groupId1 = new GroupId(TestUtils.getRandomId()); GroupId groupId1 = new GroupId(TestUtils.getRandomId());
Group group1 = new Group(groupId1, clientId, Group group1 = new Group(groupId1, clientId,
TestUtils.getRandomBytes(42)); TestUtils.getRandomBytes(MAX_GROUP_DESCRIPTOR_LENGTH));
db.addGroup(txn, group1); db.addGroup(txn, group1);
// Add a message to the second group // Add a message to the second group

View File

@@ -92,7 +92,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
oneOf(callbacks).connectionWaiting(); oneOf(callbacks).connectionWaiting();
oneOf(transport).receiveKey(); oneOf(transport).receiveKey();
will(returnValue(BOB_PUBKEY)); will(returnValue(BOB_PUBKEY));
oneOf(callbacks).initialPacketReceived(); oneOf(callbacks).initialRecordReceived();
// Alice verifies Bob's public key // Alice verifies Bob's public key
oneOf(crypto).deriveKeyCommitment(BOB_PUBKEY); oneOf(crypto).deriveKeyCommitment(BOB_PUBKEY);
@@ -152,7 +152,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
// Bob receives Alice's public key // Bob receives Alice's public key
oneOf(transport).receiveKey(); oneOf(transport).receiveKey();
will(returnValue(ALICE_PUBKEY)); will(returnValue(ALICE_PUBKEY));
oneOf(callbacks).initialPacketReceived(); oneOf(callbacks).initialRecordReceived();
// Bob verifies Alice's public key // Bob verifies Alice's public key
oneOf(crypto).deriveKeyCommitment(ALICE_PUBKEY); oneOf(crypto).deriveKeyCommitment(ALICE_PUBKEY);
@@ -213,7 +213,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
oneOf(callbacks).connectionWaiting(); oneOf(callbacks).connectionWaiting();
oneOf(transport).receiveKey(); oneOf(transport).receiveKey();
will(returnValue(BAD_PUBKEY)); will(returnValue(BAD_PUBKEY));
oneOf(callbacks).initialPacketReceived(); oneOf(callbacks).initialRecordReceived();
// Alice verifies Bob's public key // Alice verifies Bob's public key
oneOf(crypto).deriveKeyCommitment(BAD_PUBKEY); oneOf(crypto).deriveKeyCommitment(BAD_PUBKEY);
@@ -250,7 +250,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
// Bob receives a bad public key // Bob receives a bad public key
oneOf(transport).receiveKey(); oneOf(transport).receiveKey();
will(returnValue(BAD_PUBKEY)); will(returnValue(BAD_PUBKEY));
oneOf(callbacks).initialPacketReceived(); oneOf(callbacks).initialRecordReceived();
// Bob verifies Alice's public key // Bob verifies Alice's public key
oneOf(crypto).deriveKeyCommitment(BAD_PUBKEY); oneOf(crypto).deriveKeyCommitment(BAD_PUBKEY);
@@ -296,7 +296,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
oneOf(callbacks).connectionWaiting(); oneOf(callbacks).connectionWaiting();
oneOf(transport).receiveKey(); oneOf(transport).receiveKey();
will(returnValue(BOB_PUBKEY)); will(returnValue(BOB_PUBKEY));
oneOf(callbacks).initialPacketReceived(); oneOf(callbacks).initialRecordReceived();
// Alice verifies Bob's public key // Alice verifies Bob's public key
oneOf(crypto).deriveKeyCommitment(BOB_PUBKEY); oneOf(crypto).deriveKeyCommitment(BOB_PUBKEY);
@@ -357,7 +357,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
// Bob receives Alice's public key // Bob receives Alice's public key
oneOf(transport).receiveKey(); oneOf(transport).receiveKey();
will(returnValue(ALICE_PUBKEY)); will(returnValue(ALICE_PUBKEY));
oneOf(callbacks).initialPacketReceived(); oneOf(callbacks).initialRecordReceived();
// Bob verifies Alice's public key // Bob verifies Alice's public key
oneOf(crypto).deriveKeyCommitment(ALICE_PUBKEY); oneOf(crypto).deriveKeyCommitment(ALICE_PUBKEY);

View File

@@ -10,20 +10,20 @@ import org.junit.Test;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import static org.briarproject.bramble.api.sync.PacketTypes.ACK; import static org.briarproject.bramble.api.sync.RecordTypes.ACK;
import static org.briarproject.bramble.api.sync.PacketTypes.OFFER; import static org.briarproject.bramble.api.sync.RecordTypes.OFFER;
import static org.briarproject.bramble.api.sync.PacketTypes.REQUEST; import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.PACKET_HEADER_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.RECORD_HEADER_LENGTH;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class PacketReaderImplTest extends BrambleTestCase { public class RecordReaderImplTest extends BrambleTestCase {
@Test(expected = FormatException.class) @Test(expected = FormatException.class)
public void testFormatExceptionIfAckIsTooLarge() throws Exception { public void testFormatExceptionIfAckIsTooLarge() throws Exception {
byte[] b = createAck(true); byte[] b = createAck(true);
ByteArrayInputStream in = new ByteArrayInputStream(b); ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in); RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readAck(); reader.readAck();
} }
@@ -31,7 +31,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testNoFormatExceptionIfAckIsMaximumSize() throws Exception { public void testNoFormatExceptionIfAckIsMaximumSize() throws Exception {
byte[] b = createAck(false); byte[] b = createAck(false);
ByteArrayInputStream in = new ByteArrayInputStream(b); ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in); RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readAck(); reader.readAck();
} }
@@ -39,7 +39,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testEmptyAck() throws Exception { public void testEmptyAck() throws Exception {
byte[] b = createEmptyAck(); byte[] b = createEmptyAck();
ByteArrayInputStream in = new ByteArrayInputStream(b); ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in); RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readAck(); reader.readAck();
} }
@@ -47,7 +47,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testFormatExceptionIfOfferIsTooLarge() throws Exception { public void testFormatExceptionIfOfferIsTooLarge() throws Exception {
byte[] b = createOffer(true); byte[] b = createOffer(true);
ByteArrayInputStream in = new ByteArrayInputStream(b); ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in); RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readOffer(); reader.readOffer();
} }
@@ -55,7 +55,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testNoFormatExceptionIfOfferIsMaximumSize() throws Exception { public void testNoFormatExceptionIfOfferIsMaximumSize() throws Exception {
byte[] b = createOffer(false); byte[] b = createOffer(false);
ByteArrayInputStream in = new ByteArrayInputStream(b); ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in); RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readOffer(); reader.readOffer();
} }
@@ -63,7 +63,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testEmptyOffer() throws Exception { public void testEmptyOffer() throws Exception {
byte[] b = createEmptyOffer(); byte[] b = createEmptyOffer();
ByteArrayInputStream in = new ByteArrayInputStream(b); ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in); RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readOffer(); reader.readOffer();
} }
@@ -71,7 +71,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testFormatExceptionIfRequestIsTooLarge() throws Exception { public void testFormatExceptionIfRequestIsTooLarge() throws Exception {
byte[] b = createRequest(true); byte[] b = createRequest(true);
ByteArrayInputStream in = new ByteArrayInputStream(b); ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in); RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readRequest(); reader.readRequest();
} }
@@ -79,7 +79,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testNoFormatExceptionIfRequestIsMaximumSize() throws Exception { public void testNoFormatExceptionIfRequestIsMaximumSize() throws Exception {
byte[] b = createRequest(false); byte[] b = createRequest(false);
ByteArrayInputStream in = new ByteArrayInputStream(b); ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in); RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readRequest(); reader.readRequest();
} }
@@ -87,76 +87,76 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testEmptyRequest() throws Exception { public void testEmptyRequest() throws Exception {
byte[] b = createEmptyRequest(); byte[] b = createEmptyRequest();
ByteArrayInputStream in = new ByteArrayInputStream(b); ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in); RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readRequest(); reader.readRequest();
} }
private byte[] createAck(boolean tooBig) throws Exception { private byte[] createAck(boolean tooBig) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[PACKET_HEADER_LENGTH]); out.write(new byte[RECORD_HEADER_LENGTH]);
while (out.size() + UniqueId.LENGTH <= PACKET_HEADER_LENGTH while (out.size() + UniqueId.LENGTH <= RECORD_HEADER_LENGTH
+ MAX_PACKET_PAYLOAD_LENGTH) { + MAX_RECORD_PAYLOAD_LENGTH) {
out.write(TestUtils.getRandomId()); out.write(TestUtils.getRandomId());
} }
if (tooBig) out.write(TestUtils.getRandomId()); if (tooBig) out.write(TestUtils.getRandomId());
assertEquals(tooBig, out.size() > PACKET_HEADER_LENGTH + assertEquals(tooBig, out.size() > RECORD_HEADER_LENGTH +
MAX_PACKET_PAYLOAD_LENGTH); MAX_RECORD_PAYLOAD_LENGTH);
byte[] packet = out.toByteArray(); byte[] record = out.toByteArray();
packet[1] = ACK; record[1] = ACK;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2); ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return packet; return record;
} }
private byte[] createEmptyAck() throws Exception { private byte[] createEmptyAck() throws Exception {
byte[] packet = new byte[PACKET_HEADER_LENGTH]; byte[] record = new byte[RECORD_HEADER_LENGTH];
packet[1] = ACK; record[1] = ACK;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2); ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return packet; return record;
} }
private byte[] createOffer(boolean tooBig) throws Exception { private byte[] createOffer(boolean tooBig) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[PACKET_HEADER_LENGTH]); out.write(new byte[RECORD_HEADER_LENGTH]);
while (out.size() + UniqueId.LENGTH <= PACKET_HEADER_LENGTH while (out.size() + UniqueId.LENGTH <= RECORD_HEADER_LENGTH
+ MAX_PACKET_PAYLOAD_LENGTH) { + MAX_RECORD_PAYLOAD_LENGTH) {
out.write(TestUtils.getRandomId()); out.write(TestUtils.getRandomId());
} }
if (tooBig) out.write(TestUtils.getRandomId()); if (tooBig) out.write(TestUtils.getRandomId());
assertEquals(tooBig, out.size() > PACKET_HEADER_LENGTH + assertEquals(tooBig, out.size() > RECORD_HEADER_LENGTH +
MAX_PACKET_PAYLOAD_LENGTH); MAX_RECORD_PAYLOAD_LENGTH);
byte[] packet = out.toByteArray(); byte[] record = out.toByteArray();
packet[1] = OFFER; record[1] = OFFER;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2); ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return packet; return record;
} }
private byte[] createEmptyOffer() throws Exception { private byte[] createEmptyOffer() throws Exception {
byte[] packet = new byte[PACKET_HEADER_LENGTH]; byte[] record = new byte[RECORD_HEADER_LENGTH];
packet[1] = OFFER; record[1] = OFFER;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2); ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return packet; return record;
} }
private byte[] createRequest(boolean tooBig) throws Exception { private byte[] createRequest(boolean tooBig) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[PACKET_HEADER_LENGTH]); out.write(new byte[RECORD_HEADER_LENGTH]);
while (out.size() + UniqueId.LENGTH <= PACKET_HEADER_LENGTH while (out.size() + UniqueId.LENGTH <= RECORD_HEADER_LENGTH
+ MAX_PACKET_PAYLOAD_LENGTH) { + MAX_RECORD_PAYLOAD_LENGTH) {
out.write(TestUtils.getRandomId()); out.write(TestUtils.getRandomId());
} }
if (tooBig) out.write(TestUtils.getRandomId()); if (tooBig) out.write(TestUtils.getRandomId());
assertEquals(tooBig, out.size() > PACKET_HEADER_LENGTH + assertEquals(tooBig, out.size() > RECORD_HEADER_LENGTH +
MAX_PACKET_PAYLOAD_LENGTH); MAX_RECORD_PAYLOAD_LENGTH);
byte[] packet = out.toByteArray(); byte[] record = out.toByteArray();
packet[1] = REQUEST; record[1] = REQUEST;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2); ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return packet; return record;
} }
private byte[] createEmptyRequest() throws Exception { private byte[] createEmptyRequest() throws Exception {
byte[] packet = new byte[PACKET_HEADER_LENGTH]; byte[] record = new byte[RECORD_HEADER_LENGTH];
packet[1] = REQUEST; record[1] = REQUEST;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2); ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return packet; return record;
} }
} }

View File

@@ -6,10 +6,10 @@ import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.PacketWriter;
import org.briarproject.bramble.test.BrambleTestCase; import org.briarproject.bramble.test.BrambleTestCase;
import org.briarproject.bramble.test.ImmediateExecutor; import org.briarproject.bramble.test.ImmediateExecutor;
import org.briarproject.bramble.test.TestUtils; import org.briarproject.bramble.test.TestUtils;
import org.briarproject.bramble.api.sync.RecordWriter;
import org.jmock.Expectations; import org.jmock.Expectations;
import org.jmock.Mockery; import org.jmock.Mockery;
import org.junit.Test; import org.junit.Test;
@@ -29,14 +29,14 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
private final ContactId contactId; private final ContactId contactId;
private final MessageId messageId; private final MessageId messageId;
private final int maxLatency; private final int maxLatency;
private final PacketWriter packetWriter; private final RecordWriter recordWriter;
public SimplexOutgoingSessionTest() { public SimplexOutgoingSessionTest() {
context = new Mockery(); context = new Mockery();
db = context.mock(DatabaseComponent.class); db = context.mock(DatabaseComponent.class);
dbExecutor = new ImmediateExecutor(); dbExecutor = new ImmediateExecutor();
eventBus = context.mock(EventBus.class); eventBus = context.mock(EventBus.class);
packetWriter = context.mock(PacketWriter.class); recordWriter = context.mock(RecordWriter.class);
contactId = new ContactId(234); contactId = new ContactId(234);
messageId = new MessageId(TestUtils.getRandomId()); messageId = new MessageId(TestUtils.getRandomId());
maxLatency = Integer.MAX_VALUE; maxLatency = Integer.MAX_VALUE;
@@ -45,7 +45,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
@Test @Test
public void testNothingToSend() throws Exception { public void testNothingToSend() throws Exception {
final SimplexOutgoingSession session = new SimplexOutgoingSession(db, final SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, maxLatency, packetWriter); dbExecutor, eventBus, contactId, maxLatency, recordWriter);
final Transaction noAckTxn = new Transaction(null, false); final Transaction noAckTxn = new Transaction(null, false);
final Transaction noMsgTxn = new Transaction(null, false); final Transaction noMsgTxn = new Transaction(null, false);
@@ -68,7 +68,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
oneOf(db).commitTransaction(noMsgTxn); oneOf(db).commitTransaction(noMsgTxn);
oneOf(db).endTransaction(noMsgTxn); oneOf(db).endTransaction(noMsgTxn);
// Flush the output stream // Flush the output stream
oneOf(packetWriter).flush(); oneOf(recordWriter).flush();
// Remove listener // Remove listener
oneOf(eventBus).removeListener(session); oneOf(eventBus).removeListener(session);
}}); }});
@@ -83,7 +83,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
final Ack ack = new Ack(Collections.singletonList(messageId)); final Ack ack = new Ack(Collections.singletonList(messageId));
final byte[] raw = new byte[1234]; final byte[] raw = new byte[1234];
final SimplexOutgoingSession session = new SimplexOutgoingSession(db, final SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, maxLatency, packetWriter); dbExecutor, eventBus, contactId, maxLatency, recordWriter);
final Transaction ackTxn = new Transaction(null, false); final Transaction ackTxn = new Transaction(null, false);
final Transaction noAckTxn = new Transaction(null, false); final Transaction noAckTxn = new Transaction(null, false);
final Transaction msgTxn = new Transaction(null, false); final Transaction msgTxn = new Transaction(null, false);
@@ -99,7 +99,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
will(returnValue(ack)); will(returnValue(ack));
oneOf(db).commitTransaction(ackTxn); oneOf(db).commitTransaction(ackTxn);
oneOf(db).endTransaction(ackTxn); oneOf(db).endTransaction(ackTxn);
oneOf(packetWriter).writeAck(ack); oneOf(recordWriter).writeAck(ack);
// One message to send // One message to send
oneOf(db).startTransaction(false); oneOf(db).startTransaction(false);
will(returnValue(msgTxn)); will(returnValue(msgTxn));
@@ -108,7 +108,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
will(returnValue(Arrays.asList(raw))); will(returnValue(Arrays.asList(raw)));
oneOf(db).commitTransaction(msgTxn); oneOf(db).commitTransaction(msgTxn);
oneOf(db).endTransaction(msgTxn); oneOf(db).endTransaction(msgTxn);
oneOf(packetWriter).writeMessage(raw); oneOf(recordWriter).writeMessage(raw);
// No more acks // No more acks
oneOf(db).startTransaction(false); oneOf(db).startTransaction(false);
will(returnValue(noAckTxn)); will(returnValue(noAckTxn));
@@ -125,7 +125,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
oneOf(db).commitTransaction(noMsgTxn); oneOf(db).commitTransaction(noMsgTxn);
oneOf(db).endTransaction(noMsgTxn); oneOf(db).endTransaction(noMsgTxn);
// Flush the output stream // Flush the output stream
oneOf(packetWriter).flush(); oneOf(recordWriter).flush();
// Remove listener // Remove listener
oneOf(eventBus).removeListener(session); oneOf(eventBus).removeListener(session);
}}); }});

View File

@@ -12,10 +12,10 @@ import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageFactory; import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.PacketReader; import org.briarproject.bramble.api.sync.RecordReader;
import org.briarproject.bramble.api.sync.PacketReaderFactory; import org.briarproject.bramble.api.sync.RecordReaderFactory;
import org.briarproject.bramble.api.sync.PacketWriter; import org.briarproject.bramble.api.sync.RecordWriter;
import org.briarproject.bramble.api.sync.PacketWriterFactory; import org.briarproject.bramble.api.sync.RecordWriterFactory;
import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.transport.StreamContext; import org.briarproject.bramble.api.transport.StreamContext;
import org.briarproject.bramble.api.transport.StreamReaderFactory; import org.briarproject.bramble.api.transport.StreamReaderFactory;
@@ -33,6 +33,7 @@ import java.util.Collection;
import javax.inject.Inject; import javax.inject.Inject;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_GROUP_DESCRIPTOR_LENGTH;
import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH; import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@@ -50,9 +51,9 @@ public class SyncIntegrationTest extends BrambleTestCase {
@Inject @Inject
StreamWriterFactory streamWriterFactory; StreamWriterFactory streamWriterFactory;
@Inject @Inject
PacketReaderFactory packetReaderFactory; RecordReaderFactory recordReaderFactory;
@Inject @Inject
PacketWriterFactory packetWriterFactory; RecordWriterFactory recordWriterFactory;
@Inject @Inject
CryptoComponent crypto; CryptoComponent crypto;
@@ -77,7 +78,7 @@ public class SyncIntegrationTest extends BrambleTestCase {
streamNumber = 123; streamNumber = 123;
// Create a group // Create a group
ClientId clientId = new ClientId(TestUtils.getRandomString(5)); ClientId clientId = new ClientId(TestUtils.getRandomString(5));
byte[] descriptor = new byte[0]; byte[] descriptor = new byte[MAX_GROUP_DESCRIPTOR_LENGTH];
Group group = groupFactory.createGroup(clientId, descriptor); Group group = groupFactory.createGroup(clientId, descriptor);
// Add two messages to the group // Add two messages to the group
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
@@ -98,14 +99,14 @@ public class SyncIntegrationTest extends BrambleTestCase {
headerKey, streamNumber); headerKey, streamNumber);
OutputStream streamWriter = streamWriterFactory.createStreamWriter(out, OutputStream streamWriter = streamWriterFactory.createStreamWriter(out,
ctx); ctx);
PacketWriter packetWriter = packetWriterFactory.createPacketWriter( RecordWriter recordWriter = recordWriterFactory.createRecordWriter(
streamWriter); streamWriter);
packetWriter.writeAck(new Ack(messageIds)); recordWriter.writeAck(new Ack(messageIds));
packetWriter.writeMessage(message.getRaw()); recordWriter.writeMessage(message.getRaw());
packetWriter.writeMessage(message1.getRaw()); recordWriter.writeMessage(message1.getRaw());
packetWriter.writeOffer(new Offer(messageIds)); recordWriter.writeOffer(new Offer(messageIds));
packetWriter.writeRequest(new Request(messageIds)); recordWriter.writeRequest(new Request(messageIds));
streamWriter.flush(); streamWriter.flush();
return out.toByteArray(); return out.toByteArray();
@@ -127,31 +128,31 @@ public class SyncIntegrationTest extends BrambleTestCase {
headerKey, streamNumber); headerKey, streamNumber);
InputStream streamReader = streamReaderFactory.createStreamReader(in, InputStream streamReader = streamReaderFactory.createStreamReader(in,
ctx); ctx);
PacketReader packetReader = packetReaderFactory.createPacketReader( RecordReader recordReader = recordReaderFactory.createRecordReader(
streamReader); streamReader);
// Read the ack // Read the ack
assertTrue(packetReader.hasAck()); assertTrue(recordReader.hasAck());
Ack a = packetReader.readAck(); Ack a = recordReader.readAck();
assertEquals(messageIds, a.getMessageIds()); assertEquals(messageIds, a.getMessageIds());
// Read the messages // Read the messages
assertTrue(packetReader.hasMessage()); assertTrue(recordReader.hasMessage());
Message m = packetReader.readMessage(); Message m = recordReader.readMessage();
checkMessageEquality(message, m); checkMessageEquality(message, m);
assertTrue(packetReader.hasMessage()); assertTrue(recordReader.hasMessage());
m = packetReader.readMessage(); m = recordReader.readMessage();
checkMessageEquality(message1, m); checkMessageEquality(message1, m);
assertFalse(packetReader.hasMessage()); assertFalse(recordReader.hasMessage());
// Read the offer // Read the offer
assertTrue(packetReader.hasOffer()); assertTrue(recordReader.hasOffer());
Offer o = packetReader.readOffer(); Offer o = recordReader.readOffer();
assertEquals(messageIds, o.getMessageIds()); assertEquals(messageIds, o.getMessageIds());
// Read the request // Read the request
assertTrue(packetReader.hasRequest()); assertTrue(recordReader.hasRequest());
Request req = packetReader.readRequest(); Request req = recordReader.readRequest();
assertEquals(messageIds, req.getMessageIds()); assertEquals(messageIds, req.getMessageIds());
in.close(); in.close();

View File

@@ -39,7 +39,11 @@ class QueueMessageFactoryImpl implements QueueMessageFactory {
ByteUtils.writeUint64(queuePosition, raw, MESSAGE_HEADER_LENGTH); ByteUtils.writeUint64(queuePosition, raw, MESSAGE_HEADER_LENGTH);
System.arraycopy(body, 0, raw, QUEUE_MESSAGE_HEADER_LENGTH, System.arraycopy(body, 0, raw, QUEUE_MESSAGE_HEADER_LENGTH,
body.length); body.length);
MessageId id = new MessageId(crypto.hash(MessageId.LABEL, raw)); byte[] timeBytes = new byte[ByteUtils.INT_64_BYTES];
ByteUtils.writeUint64(timestamp, timeBytes, 0);
MessageId id = new MessageId(
crypto.hash(MessageId.LABEL, groupId.getBytes(), timeBytes,
body));
return new QueueMessage(id, groupId, timestamp, queuePosition, raw); return new QueueMessage(id, groupId, timestamp, queuePosition, raw);
} }

View File

@@ -22,8 +22,8 @@ import javax.inject.Inject;
import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_AUTHOR_NAME_LENGTH; import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_AUTHOR_NAME_LENGTH;
import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_PUBLIC_KEY_LENGTH; import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_PUBLIC_KEY_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
import static org.briarproject.briar.api.forum.ForumConstants.MAX_FORUM_POST_BODY_LENGTH; import static org.briarproject.briar.api.forum.ForumConstants.MAX_FORUM_POST_BODY_LENGTH;
import static org.briarproject.briar.api.messaging.MessagingConstants.MAX_PRIVATE_MESSAGE_BODY_LENGTH; import static org.briarproject.briar.api.messaging.MessagingConstants.MAX_PRIVATE_MESSAGE_BODY_LENGTH;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@@ -59,7 +59,7 @@ public class MessageSizeIntegrationTest extends BriarTestCase {
int length = message.getMessage().getRaw().length; int length = message.getMessage().getRaw().length;
assertTrue( assertTrue(
length > UniqueId.LENGTH + 8 + MAX_PRIVATE_MESSAGE_BODY_LENGTH); length > UniqueId.LENGTH + 8 + MAX_PRIVATE_MESSAGE_BODY_LENGTH);
assertTrue(length <= MAX_PACKET_PAYLOAD_LENGTH); assertTrue(length <= MAX_RECORD_PAYLOAD_LENGTH);
} }
@Test @Test
@@ -85,7 +85,7 @@ public class MessageSizeIntegrationTest extends BriarTestCase {
+ MAX_AUTHOR_NAME_LENGTH + MAX_PUBLIC_KEY_LENGTH + MAX_AUTHOR_NAME_LENGTH + MAX_PUBLIC_KEY_LENGTH
+ ForumConstants.MAX_CONTENT_TYPE_LENGTH + ForumConstants.MAX_CONTENT_TYPE_LENGTH
+ MAX_FORUM_POST_BODY_LENGTH); + MAX_FORUM_POST_BODY_LENGTH);
assertTrue(length <= MAX_PACKET_PAYLOAD_LENGTH); assertTrue(length <= MAX_RECORD_PAYLOAD_LENGTH);
} }
private static void injectEagerSingletons( private static void injectEagerSingletons(