Use generic record reader/writer for sync.

This commit is contained in:
akwizgran
2018-04-19 13:11:48 +01:00
parent 4100daaa47
commit 215c62ed23
16 changed files with 203 additions and 243 deletions

View File

@@ -39,8 +39,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
/**
* An outgoing {@link SyncSession} suitable for duplex transports. The session
@@ -273,7 +273,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
Transaction txn = db.startTransaction(false);
try {
b = db.generateRequestedBatch(txn, contactId,
MAX_RECORD_PAYLOAD_LENGTH, maxLatency);
MAX_RECORD_PAYLOAD_BYTES, maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId));
db.commitTransaction(txn);
} finally {

View File

@@ -16,6 +16,7 @@ import static org.briarproject.bramble.api.sync.Message.FORMAT_VERSION;
import static org.briarproject.bramble.api.sync.MessageId.BLOCK_LABEL;
import static org.briarproject.bramble.api.sync.MessageId.ID_LABEL;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.bramble.util.ByteUtils.INT_64_BYTES;
@@ -53,6 +54,8 @@ class MessageFactoryImpl implements MessageFactory {
public Message createMessage(MessageId m, byte[] raw) {
if (raw.length < MESSAGE_HEADER_LENGTH)
throw new IllegalArgumentException();
if (raw.length > MAX_MESSAGE_LENGTH)
throw new IllegalArgumentException();
byte[] groupId = new byte[UniqueId.LENGTH];
System.arraycopy(raw, 0, groupId, 0, UniqueId.LENGTH);
long timestamp = ByteUtils.readUint64(raw, UniqueId.LENGTH);

View File

@@ -29,8 +29,8 @@ import javax.annotation.concurrent.ThreadSafe;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
/**
* An outgoing {@link SyncSession} suitable for simplex transports. The session
@@ -171,7 +171,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
Transaction txn = db.startTransaction(false);
try {
b = db.generateBatch(txn, contactId,
MAX_RECORD_PAYLOAD_LENGTH, maxLatency);
MAX_RECORD_PAYLOAD_BYTES, maxLatency);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);

View File

@@ -58,8 +58,9 @@ public class SyncModule {
}
@Provides
SyncRecordWriterFactory provideRecordWriterFactory() {
return new SyncRecordWriterFactoryImpl();
SyncRecordWriterFactory provideRecordWriterFactory(
SyncRecordWriterFactoryImpl recordWriterFactory) {
return recordWriterFactory;
}
@Provides

View File

@@ -1,6 +1,8 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.record.RecordReader;
import org.briarproject.bramble.api.record.RecordReaderFactory;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.SyncRecordReader;
import org.briarproject.bramble.api.sync.SyncRecordReaderFactory;
@@ -15,14 +17,18 @@ import javax.inject.Inject;
class SyncRecordReaderFactoryImpl implements SyncRecordReaderFactory {
private final MessageFactory messageFactory;
private final RecordReaderFactory recordReaderFactory;
@Inject
SyncRecordReaderFactoryImpl(MessageFactory messageFactory) {
SyncRecordReaderFactoryImpl(MessageFactory messageFactory,
RecordReaderFactory recordReaderFactory) {
this.messageFactory = messageFactory;
this.recordReaderFactory = recordReaderFactory;
}
@Override
public SyncRecordReader createRecordReader(InputStream in) {
return new SyncRecordReaderImpl(messageFactory, in);
RecordReader reader = recordReaderFactory.createRecordReader(in);
return new SyncRecordReaderImpl(messageFactory, reader);
}
}

View File

@@ -3,6 +3,8 @@ package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.FormatException;
import org.briarproject.bramble.api.UniqueId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.record.Record;
import org.briarproject.bramble.api.record.RecordReader;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message;
@@ -13,72 +15,45 @@ import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordReader;
import org.briarproject.bramble.util.ByteUtils;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import static org.briarproject.bramble.api.sync.RecordTypes.ACK;
import static org.briarproject.bramble.api.sync.RecordTypes.MESSAGE;
import static org.briarproject.bramble.api.sync.RecordTypes.OFFER;
import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION;
import static org.briarproject.bramble.api.sync.SyncConstants.RECORD_HEADER_LENGTH;
@NotThreadSafe
@NotNullByDefault
class SyncRecordReaderImpl implements SyncRecordReader {
private enum State {BUFFER_EMPTY, BUFFER_FULL, EOF}
private final MessageFactory messageFactory;
private final InputStream in;
private final byte[] header, payload;
private final RecordReader reader;
private State state = State.BUFFER_EMPTY;
private int payloadLength = 0;
@Nullable
private Record nextRecord = null;
private boolean eof = false;
SyncRecordReaderImpl(MessageFactory messageFactory, InputStream in) {
SyncRecordReaderImpl(MessageFactory messageFactory, RecordReader reader) {
this.messageFactory = messageFactory;
this.in = in;
header = new byte[RECORD_HEADER_LENGTH];
payload = new byte[MAX_RECORD_PAYLOAD_LENGTH];
this.reader = reader;
}
private void readRecord() throws IOException {
if (state != State.BUFFER_EMPTY) throw new IllegalStateException();
assert nextRecord == null;
while (true) {
// Read the header
int offset = 0;
while (offset < RECORD_HEADER_LENGTH) {
int read =
in.read(header, offset, RECORD_HEADER_LENGTH - offset);
if (read == -1) {
if (offset > 0) throw new FormatException();
state = State.EOF;
return;
}
offset += read;
}
byte version = header[0], type = header[1];
payloadLength = ByteUtils.readUint16(header, 2);
nextRecord = reader.readRecord();
// Check the protocol version
byte version = nextRecord.getProtocolVersion();
if (version != PROTOCOL_VERSION) throw new FormatException();
// Check the payload length
if (payloadLength > MAX_RECORD_PAYLOAD_LENGTH)
throw new FormatException();
// Read the payload
offset = 0;
while (offset < payloadLength) {
int read = in.read(payload, offset, payloadLength - offset);
if (read == -1) throw new FormatException();
offset += read;
}
state = State.BUFFER_FULL;
byte type = nextRecord.getRecordType();
// Return if this is a known record type, otherwise continue
if (type == ACK || type == MESSAGE || type == OFFER ||
type == REQUEST) {
@@ -87,6 +62,11 @@ class SyncRecordReaderImpl implements SyncRecordReader {
}
}
private byte getNextRecordType() {
assert nextRecord != null;
return nextRecord.getRecordType();
}
/**
* Returns true if there's another record available or false if we've
* reached the end of the input stream.
@@ -97,14 +77,21 @@ class SyncRecordReaderImpl implements SyncRecordReader {
*/
@Override
public boolean eof() throws IOException {
if (state == State.BUFFER_EMPTY) readRecord();
if (state == State.BUFFER_EMPTY) throw new IllegalStateException();
return state == State.EOF;
if (nextRecord != null) return false;
if (eof) return true;
try {
readRecord();
return false;
} catch (EOFException e) {
nextRecord = null;
eof = true;
return true;
}
}
@Override
public boolean hasAck() throws IOException {
return !eof() && header[1] == ACK;
return !eof() && getNextRecordType() == ACK;
}
@Override
@@ -114,27 +101,31 @@ class SyncRecordReaderImpl implements SyncRecordReader {
}
private List<MessageId> readMessageIds() throws IOException {
if (payloadLength == 0) throw new FormatException();
if (payloadLength % UniqueId.LENGTH != 0) throw new FormatException();
List<MessageId> ids = new ArrayList<>();
for (int off = 0; off < payloadLength; off += UniqueId.LENGTH) {
assert nextRecord != null;
byte[] payload = nextRecord.getPayload();
if (payload.length == 0) throw new FormatException();
if (payload.length % UniqueId.LENGTH != 0) throw new FormatException();
List<MessageId> ids = new ArrayList<>(payload.length / UniqueId.LENGTH);
for (int off = 0; off < payload.length; off += UniqueId.LENGTH) {
byte[] id = new byte[UniqueId.LENGTH];
System.arraycopy(payload, off, id, 0, UniqueId.LENGTH);
ids.add(new MessageId(id));
}
state = State.BUFFER_EMPTY;
nextRecord = null;
return ids;
}
@Override
public boolean hasMessage() throws IOException {
return !eof() && header[1] == MESSAGE;
return !eof() && getNextRecordType() == MESSAGE;
}
@Override
public Message readMessage() throws IOException {
if (!hasMessage()) throw new FormatException();
if (payloadLength <= MESSAGE_HEADER_LENGTH) throw new FormatException();
assert nextRecord != null;
byte[] payload = nextRecord.getPayload();
if (payload.length < MESSAGE_HEADER_LENGTH) throw new FormatException();
// Group ID
byte[] id = new byte[UniqueId.LENGTH];
System.arraycopy(payload, 0, id, 0, UniqueId.LENGTH);
@@ -143,16 +134,17 @@ class SyncRecordReaderImpl implements SyncRecordReader {
long timestamp = ByteUtils.readUint64(payload, UniqueId.LENGTH);
if (timestamp < 0) throw new FormatException();
// Body
byte[] body = new byte[payloadLength - MESSAGE_HEADER_LENGTH];
byte[] body = new byte[payload.length - MESSAGE_HEADER_LENGTH];
System.arraycopy(payload, MESSAGE_HEADER_LENGTH, body, 0,
payloadLength - MESSAGE_HEADER_LENGTH);
state = State.BUFFER_EMPTY;
payload.length - MESSAGE_HEADER_LENGTH);
nextRecord = null;
// TODO: Add a method that reuses the raw message
return messageFactory.createMessage(groupId, timestamp, body);
}
@Override
public boolean hasOffer() throws IOException {
return !eof() && header[1] == OFFER;
return !eof() && getNextRecordType() == OFFER;
}
@Override
@@ -163,7 +155,7 @@ class SyncRecordReaderImpl implements SyncRecordReader {
@Override
public boolean hasRequest() throws IOException {
return !eof() && header[1] == REQUEST;
return !eof() && getNextRecordType() == REQUEST;
}
@Override

View File

@@ -1,16 +1,28 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.record.RecordWriter;
import org.briarproject.bramble.api.record.RecordWriterFactory;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncRecordWriterFactory;
import java.io.OutputStream;
import javax.inject.Inject;
@NotNullByDefault
class SyncRecordWriterFactoryImpl implements SyncRecordWriterFactory {
private final RecordWriterFactory recordWriterFactory;
@Inject
SyncRecordWriterFactoryImpl(RecordWriterFactory recordWriterFactory) {
this.recordWriterFactory = recordWriterFactory;
}
@Override
public SyncRecordWriter createRecordWriter(OutputStream out) {
return new SyncRecordWriterImpl(out);
RecordWriter writer = recordWriterFactory.createRecordWriter(out);
return new SyncRecordWriterImpl(writer);
}
}

View File

@@ -1,81 +1,67 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.record.Record;
import org.briarproject.bramble.api.record.RecordWriter;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.RecordTypes;
import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.util.ByteUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import javax.annotation.concurrent.NotThreadSafe;
import static org.briarproject.bramble.api.sync.RecordTypes.ACK;
import static org.briarproject.bramble.api.sync.RecordTypes.MESSAGE;
import static org.briarproject.bramble.api.sync.RecordTypes.OFFER;
import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION;
import static org.briarproject.bramble.api.sync.SyncConstants.RECORD_HEADER_LENGTH;
@NotThreadSafe
@NotNullByDefault
class SyncRecordWriterImpl implements SyncRecordWriter {
private final OutputStream out;
private final byte[] header;
private final ByteArrayOutputStream payload;
private final RecordWriter writer;
private final ByteArrayOutputStream payload = new ByteArrayOutputStream();
SyncRecordWriterImpl(OutputStream out) {
this.out = out;
header = new byte[RECORD_HEADER_LENGTH];
header[0] = PROTOCOL_VERSION;
payload = new ByteArrayOutputStream(MAX_RECORD_PAYLOAD_LENGTH);
SyncRecordWriterImpl(RecordWriter writer) {
this.writer = writer;
}
private void writeRecord(byte recordType) throws IOException {
header[1] = recordType;
ByteUtils.writeUint16(payload.size(), header, 2);
out.write(header);
payload.writeTo(out);
writer.writeRecord(new Record(PROTOCOL_VERSION, recordType,
payload.toByteArray()));
payload.reset();
}
@Override
public void writeAck(Ack a) throws IOException {
if (payload.size() != 0) throw new IllegalStateException();
for (MessageId m : a.getMessageIds()) payload.write(m.getBytes());
writeRecord(ACK);
}
@Override
public void writeMessage(byte[] raw) throws IOException {
header[1] = RecordTypes.MESSAGE;
ByteUtils.writeUint16(raw.length, header, 2);
out.write(header);
out.write(raw);
writer.writeRecord(new Record(PROTOCOL_VERSION, MESSAGE, raw));
}
@Override
public void writeOffer(Offer o) throws IOException {
if (payload.size() != 0) throw new IllegalStateException();
for (MessageId m : o.getMessageIds()) payload.write(m.getBytes());
writeRecord(OFFER);
}
@Override
public void writeRequest(Request r) throws IOException {
if (payload.size() != 0) throw new IllegalStateException();
for (MessageId m : r.getMessageIds()) payload.write(m.getBytes());
writeRecord(REQUEST);
}
@Override
public void flush() throws IOException {
out.flush();
writer.flush();
}
}