mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-15 04:18:53 +01:00
Moved the messaging protocol one step closer to BSP.
This breaks backward compatibility for the wire protocol and messages stored in the database. The database schema version has been incremented.
This commit is contained in:
@@ -1,7 +1,5 @@
|
||||
package org.briarproject.messaging;
|
||||
|
||||
import static org.briarproject.api.messaging.Types.AUTHOR;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
@@ -45,10 +43,10 @@ class AuthorFactoryImpl implements AuthorFactory {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Writer w = writerFactory.createWriter(out);
|
||||
try {
|
||||
w.writeStructStart(AUTHOR);
|
||||
w.writeListStart();
|
||||
w.writeString(name);
|
||||
w.writeBytes(publicKey);
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
} catch(IOException e) {
|
||||
// Shouldn't happen with ByteArrayOutputStream
|
||||
throw new RuntimeException();
|
||||
|
||||
@@ -2,7 +2,6 @@ package org.briarproject.messaging;
|
||||
|
||||
import static org.briarproject.api.AuthorConstants.MAX_AUTHOR_NAME_LENGTH;
|
||||
import static org.briarproject.api.AuthorConstants.MAX_PUBLIC_KEY_LENGTH;
|
||||
import static org.briarproject.api.messaging.Types.AUTHOR;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@@ -13,9 +12,9 @@ import org.briarproject.api.crypto.CryptoComponent;
|
||||
import org.briarproject.api.crypto.MessageDigest;
|
||||
import org.briarproject.api.serial.DigestingConsumer;
|
||||
import org.briarproject.api.serial.Reader;
|
||||
import org.briarproject.api.serial.StructReader;
|
||||
import org.briarproject.api.serial.ObjectReader;
|
||||
|
||||
class AuthorReader implements StructReader<Author> {
|
||||
class AuthorReader implements ObjectReader<Author> {
|
||||
|
||||
private final MessageDigest messageDigest;
|
||||
|
||||
@@ -23,16 +22,16 @@ class AuthorReader implements StructReader<Author> {
|
||||
messageDigest = crypto.getMessageDigest();
|
||||
}
|
||||
|
||||
public Author readStruct(Reader r) throws IOException {
|
||||
public Author readObject(Reader r) throws IOException {
|
||||
// Set up the reader
|
||||
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
|
||||
r.addConsumer(digesting);
|
||||
// Read and digest the data
|
||||
r.readStructStart(AUTHOR);
|
||||
r.readListStart();
|
||||
String name = r.readString(MAX_AUTHOR_NAME_LENGTH);
|
||||
if(name.length() == 0) throw new FormatException();
|
||||
byte[] publicKey = r.readBytes(MAX_PUBLIC_KEY_LENGTH);
|
||||
r.readStructEnd();
|
||||
r.readListEnd();
|
||||
// Reset the reader
|
||||
r.removeConsumer(digesting);
|
||||
// Build and return the author
|
||||
|
||||
@@ -3,7 +3,7 @@ package org.briarproject.messaging;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PAYLOAD_LENGTH;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
@@ -260,7 +260,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
|
||||
if(interrupted) return;
|
||||
try {
|
||||
Collection<byte[]> b = db.generateRequestedBatch(contactId,
|
||||
MAX_PACKET_LENGTH, maxLatency);
|
||||
MAX_PAYLOAD_LENGTH, maxLatency);
|
||||
if(LOG.isLoggable(INFO))
|
||||
LOG.info("Generated batch: " + (b != null));
|
||||
if(b != null) writerTasks.add(new WriteBatch(b));
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package org.briarproject.messaging;
|
||||
|
||||
import static org.briarproject.api.messaging.MessagingConstants.GROUP_SALT_LENGTH;
|
||||
import static org.briarproject.api.messaging.Types.GROUP;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
@@ -37,10 +36,10 @@ class GroupFactoryImpl implements GroupFactory {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Writer w = writerFactory.createWriter(out);
|
||||
try {
|
||||
w.writeStructStart(GROUP);
|
||||
w.writeListStart();
|
||||
w.writeString(name);
|
||||
w.writeBytes(salt);
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
} catch(IOException e) {
|
||||
// Shouldn't happen with ByteArrayOutputStream
|
||||
throw new RuntimeException();
|
||||
|
||||
@@ -2,7 +2,6 @@ package org.briarproject.messaging;
|
||||
|
||||
import static org.briarproject.api.messaging.MessagingConstants.GROUP_SALT_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_GROUP_NAME_LENGTH;
|
||||
import static org.briarproject.api.messaging.Types.GROUP;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@@ -13,9 +12,9 @@ import org.briarproject.api.messaging.Group;
|
||||
import org.briarproject.api.messaging.GroupId;
|
||||
import org.briarproject.api.serial.DigestingConsumer;
|
||||
import org.briarproject.api.serial.Reader;
|
||||
import org.briarproject.api.serial.StructReader;
|
||||
import org.briarproject.api.serial.ObjectReader;
|
||||
|
||||
class GroupReader implements StructReader<Group> {
|
||||
class GroupReader implements ObjectReader<Group> {
|
||||
|
||||
private final MessageDigest messageDigest;
|
||||
|
||||
@@ -23,16 +22,16 @@ class GroupReader implements StructReader<Group> {
|
||||
messageDigest = crypto.getMessageDigest();
|
||||
}
|
||||
|
||||
public Group readStruct(Reader r) throws IOException {
|
||||
public Group readObject(Reader r) throws IOException {
|
||||
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
|
||||
// Read and digest the data
|
||||
r.addConsumer(digesting);
|
||||
r.readStructStart(GROUP);
|
||||
r.readListStart();
|
||||
String name = r.readString(MAX_GROUP_NAME_LENGTH);
|
||||
if(name.length() == 0) throw new FormatException();
|
||||
byte[] salt = r.readBytes(GROUP_SALT_LENGTH);
|
||||
if(salt.length != GROUP_SALT_LENGTH) throw new FormatException();
|
||||
r.readStructEnd();
|
||||
r.readListEnd();
|
||||
r.removeConsumer(digesting);
|
||||
// Build and return the group
|
||||
GroupId id = new GroupId(messageDigest.digest());
|
||||
|
||||
@@ -3,11 +3,8 @@ package org.briarproject.messaging;
|
||||
import static org.briarproject.api.AuthorConstants.MAX_SIGNATURE_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_BODY_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_CONTENT_TYPE_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PAYLOAD_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MESSAGE_SALT_LENGTH;
|
||||
import static org.briarproject.api.messaging.Types.AUTHOR;
|
||||
import static org.briarproject.api.messaging.Types.GROUP;
|
||||
import static org.briarproject.api.messaging.Types.MESSAGE;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
@@ -77,7 +74,7 @@ class MessageFactoryImpl implements MessageFactory {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Writer w = writerFactory.createWriter(out);
|
||||
// Initialise the consumers
|
||||
CountingConsumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
|
||||
CountingConsumer counting = new CountingConsumer(MAX_PAYLOAD_LENGTH);
|
||||
w.addConsumer(counting);
|
||||
Consumer digestingConsumer = new DigestingConsumer(messageDigest);
|
||||
w.addConsumer(digestingConsumer);
|
||||
@@ -88,7 +85,7 @@ class MessageFactoryImpl implements MessageFactory {
|
||||
w.addConsumer(signingConsumer);
|
||||
}
|
||||
// Write the message
|
||||
w.writeStructStart(MESSAGE);
|
||||
w.writeListStart();
|
||||
if(parent == null) w.writeNull();
|
||||
else w.writeBytes(parent.getBytes());
|
||||
writeGroup(w, group);
|
||||
@@ -111,7 +108,7 @@ class MessageFactoryImpl implements MessageFactory {
|
||||
throw new IllegalArgumentException();
|
||||
w.writeBytes(sig);
|
||||
}
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
// Hash the message, including the signature, to get the message ID
|
||||
w.removeConsumer(digestingConsumer);
|
||||
MessageId id = new MessageId(messageDigest.digest());
|
||||
@@ -120,16 +117,16 @@ class MessageFactoryImpl implements MessageFactory {
|
||||
}
|
||||
|
||||
private void writeGroup(Writer w, Group g) throws IOException {
|
||||
w.writeStructStart(GROUP);
|
||||
w.writeListStart();
|
||||
w.writeString(g.getName());
|
||||
w.writeBytes(g.getSalt());
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
}
|
||||
|
||||
private void writeAuthor(Writer w, Author a) throws IOException {
|
||||
w.writeStructStart(AUTHOR);
|
||||
w.writeListStart();
|
||||
w.writeString(a.getName());
|
||||
w.writeBytes(a.getPublicKey());
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,9 +3,8 @@ package org.briarproject.messaging;
|
||||
import static org.briarproject.api.AuthorConstants.MAX_SIGNATURE_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_BODY_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_CONTENT_TYPE_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PAYLOAD_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MESSAGE_SALT_LENGTH;
|
||||
import static org.briarproject.api.messaging.Types.MESSAGE;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@@ -17,27 +16,27 @@ import org.briarproject.api.messaging.MessageId;
|
||||
import org.briarproject.api.messaging.UnverifiedMessage;
|
||||
import org.briarproject.api.serial.CopyingConsumer;
|
||||
import org.briarproject.api.serial.CountingConsumer;
|
||||
import org.briarproject.api.serial.ObjectReader;
|
||||
import org.briarproject.api.serial.Reader;
|
||||
import org.briarproject.api.serial.StructReader;
|
||||
|
||||
class MessageReader implements StructReader<UnverifiedMessage> {
|
||||
class MessageReader implements ObjectReader<UnverifiedMessage> {
|
||||
|
||||
private final StructReader<Group> groupReader;
|
||||
private final StructReader<Author> authorReader;
|
||||
private final ObjectReader<Group> groupReader;
|
||||
private final ObjectReader<Author> authorReader;
|
||||
|
||||
MessageReader(StructReader<Group> groupReader,
|
||||
StructReader<Author> authorReader) {
|
||||
MessageReader(ObjectReader<Group> groupReader,
|
||||
ObjectReader<Author> authorReader) {
|
||||
this.groupReader = groupReader;
|
||||
this.authorReader = authorReader;
|
||||
}
|
||||
|
||||
public UnverifiedMessage readStruct(Reader r) throws IOException {
|
||||
public UnverifiedMessage readObject(Reader r) throws IOException {
|
||||
CopyingConsumer copying = new CopyingConsumer();
|
||||
CountingConsumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
|
||||
CountingConsumer counting = new CountingConsumer(MAX_PAYLOAD_LENGTH);
|
||||
r.addConsumer(copying);
|
||||
r.addConsumer(counting);
|
||||
// Read the start of the struct
|
||||
r.readStructStart(MESSAGE);
|
||||
// Read the start of the message
|
||||
r.readListStart();
|
||||
// Read the parent's message ID, if there is one
|
||||
MessageId parent = null;
|
||||
if(r.hasNull()) {
|
||||
@@ -48,11 +47,11 @@ class MessageReader implements StructReader<UnverifiedMessage> {
|
||||
parent = new MessageId(b);
|
||||
}
|
||||
// Read the group
|
||||
Group group = groupReader.readStruct(r);
|
||||
Group group = groupReader.readObject(r);
|
||||
// Read the author, if there is one
|
||||
Author author = null;
|
||||
if(r.hasNull()) r.readNull();
|
||||
else author = authorReader.readStruct(r);
|
||||
else author = authorReader.readObject(r);
|
||||
// Read the content type
|
||||
String contentType = r.readString(MAX_CONTENT_TYPE_LENGTH);
|
||||
// Read the timestamp
|
||||
@@ -71,11 +70,12 @@ class MessageReader implements StructReader<UnverifiedMessage> {
|
||||
byte[] signature = null;
|
||||
if(author == null) r.readNull();
|
||||
else signature = r.readBytes(MAX_SIGNATURE_LENGTH);
|
||||
// Read the end of the struct
|
||||
r.readStructEnd();
|
||||
// The signature will be verified later
|
||||
// Read the end of the message
|
||||
r.readListEnd();
|
||||
// Reset the reader
|
||||
r.removeConsumer(counting);
|
||||
r.removeConsumer(copying);
|
||||
// Build and return the unverified message
|
||||
byte[] raw = copying.getCopy();
|
||||
return new UnverifiedMessage(parent, group, author, contentType,
|
||||
timestamp, raw, signature, bodyStart, body.length,
|
||||
|
||||
@@ -14,7 +14,7 @@ import org.briarproject.api.messaging.PacketReaderFactory;
|
||||
import org.briarproject.api.messaging.PacketWriterFactory;
|
||||
import org.briarproject.api.messaging.SubscriptionUpdate;
|
||||
import org.briarproject.api.messaging.UnverifiedMessage;
|
||||
import org.briarproject.api.serial.StructReader;
|
||||
import org.briarproject.api.serial.ObjectReader;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
@@ -34,25 +34,25 @@ public class MessagingModule extends AbstractModule {
|
||||
}
|
||||
|
||||
@Provides
|
||||
StructReader<Author> getAuthorReader(CryptoComponent crypto) {
|
||||
ObjectReader<Author> getAuthorReader(CryptoComponent crypto) {
|
||||
return new AuthorReader(crypto);
|
||||
}
|
||||
|
||||
@Provides
|
||||
StructReader<Group> getGroupReader(CryptoComponent crypto) {
|
||||
ObjectReader<Group> getGroupReader(CryptoComponent crypto) {
|
||||
return new GroupReader(crypto);
|
||||
}
|
||||
|
||||
@Provides
|
||||
StructReader<UnverifiedMessage> getMessageReader(
|
||||
StructReader<Group> groupReader,
|
||||
StructReader<Author> authorReader) {
|
||||
ObjectReader<UnverifiedMessage> getMessageReader(
|
||||
ObjectReader<Group> groupReader,
|
||||
ObjectReader<Author> authorReader) {
|
||||
return new MessageReader(groupReader, authorReader);
|
||||
}
|
||||
|
||||
@Provides
|
||||
StructReader<SubscriptionUpdate> getSubscriptionUpdateReader(
|
||||
StructReader<Group> groupReader) {
|
||||
ObjectReader<SubscriptionUpdate> getSubscriptionUpdateReader(
|
||||
ObjectReader<Group> groupReader) {
|
||||
return new SubscriptionUpdateReader(groupReader);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,18 +9,18 @@ import org.briarproject.api.messaging.PacketReaderFactory;
|
||||
import org.briarproject.api.messaging.SubscriptionUpdate;
|
||||
import org.briarproject.api.messaging.UnverifiedMessage;
|
||||
import org.briarproject.api.serial.ReaderFactory;
|
||||
import org.briarproject.api.serial.StructReader;
|
||||
import org.briarproject.api.serial.ObjectReader;
|
||||
|
||||
class PacketReaderFactoryImpl implements PacketReaderFactory {
|
||||
|
||||
private final ReaderFactory readerFactory;
|
||||
private final StructReader<UnverifiedMessage> messageReader;
|
||||
private final StructReader<SubscriptionUpdate> subscriptionUpdateReader;
|
||||
private final ObjectReader<UnverifiedMessage> messageReader;
|
||||
private final ObjectReader<SubscriptionUpdate> subscriptionUpdateReader;
|
||||
|
||||
@Inject
|
||||
PacketReaderFactoryImpl(ReaderFactory readerFactory,
|
||||
StructReader<UnverifiedMessage> messageReader,
|
||||
StructReader<SubscriptionUpdate> subscriptionUpdateReader) {
|
||||
ObjectReader<UnverifiedMessage> messageReader,
|
||||
ObjectReader<SubscriptionUpdate> subscriptionUpdateReader) {
|
||||
this.readerFactory = readerFactory;
|
||||
this.messageReader = messageReader;
|
||||
this.subscriptionUpdateReader = subscriptionUpdateReader;
|
||||
|
||||
@@ -3,18 +3,21 @@ package org.briarproject.messaging;
|
||||
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTIES_PER_TRANSPORT;
|
||||
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTY_LENGTH;
|
||||
import static org.briarproject.api.TransportPropertyConstants.MAX_TRANSPORT_ID_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
|
||||
import static org.briarproject.api.messaging.Types.ACK;
|
||||
import static org.briarproject.api.messaging.Types.MESSAGE;
|
||||
import static org.briarproject.api.messaging.Types.OFFER;
|
||||
import static org.briarproject.api.messaging.Types.REQUEST;
|
||||
import static org.briarproject.api.messaging.Types.RETENTION_ACK;
|
||||
import static org.briarproject.api.messaging.Types.RETENTION_UPDATE;
|
||||
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_ACK;
|
||||
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_UPDATE;
|
||||
import static org.briarproject.api.messaging.Types.TRANSPORT_ACK;
|
||||
import static org.briarproject.api.messaging.Types.TRANSPORT_UPDATE;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.HEADER_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PAYLOAD_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.PROTOCOL_VERSION;
|
||||
import static org.briarproject.api.messaging.PacketTypes.ACK;
|
||||
import static org.briarproject.api.messaging.PacketTypes.MESSAGE;
|
||||
import static org.briarproject.api.messaging.PacketTypes.OFFER;
|
||||
import static org.briarproject.api.messaging.PacketTypes.REQUEST;
|
||||
import static org.briarproject.api.messaging.PacketTypes.RETENTION_ACK;
|
||||
import static org.briarproject.api.messaging.PacketTypes.RETENTION_UPDATE;
|
||||
import static org.briarproject.api.messaging.PacketTypes.SUBSCRIPTION_ACK;
|
||||
import static org.briarproject.api.messaging.PacketTypes.SUBSCRIPTION_UPDATE;
|
||||
import static org.briarproject.api.messaging.PacketTypes.TRANSPORT_ACK;
|
||||
import static org.briarproject.api.messaging.PacketTypes.TRANSPORT_UPDATE;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
@@ -39,42 +42,82 @@ import org.briarproject.api.messaging.SubscriptionUpdate;
|
||||
import org.briarproject.api.messaging.TransportAck;
|
||||
import org.briarproject.api.messaging.TransportUpdate;
|
||||
import org.briarproject.api.messaging.UnverifiedMessage;
|
||||
import org.briarproject.api.serial.Consumer;
|
||||
import org.briarproject.api.serial.CountingConsumer;
|
||||
import org.briarproject.api.serial.Reader;
|
||||
import org.briarproject.api.serial.ReaderFactory;
|
||||
import org.briarproject.api.serial.StructReader;
|
||||
import org.briarproject.api.serial.ObjectReader;
|
||||
import org.briarproject.util.ByteUtils;
|
||||
|
||||
// This class is not thread-safe
|
||||
class PacketReaderImpl implements PacketReader {
|
||||
|
||||
private final StructReader<UnverifiedMessage> messageReader;
|
||||
private final StructReader<SubscriptionUpdate> subscriptionUpdateReader;
|
||||
private final Reader r;
|
||||
private enum State { BUFFER_EMPTY, BUFFER_FULL, EOF };
|
||||
|
||||
private final ReaderFactory readerFactory;
|
||||
private final ObjectReader<UnverifiedMessage> messageReader;
|
||||
private final ObjectReader<SubscriptionUpdate> subscriptionUpdateReader;
|
||||
private final InputStream in;
|
||||
private final byte[] header, payload;
|
||||
|
||||
private State state = State.BUFFER_EMPTY;
|
||||
private int payloadLength = 0;
|
||||
|
||||
PacketReaderImpl(ReaderFactory readerFactory,
|
||||
StructReader<UnverifiedMessage> messageReader,
|
||||
StructReader<SubscriptionUpdate> subscriptionUpdateReader,
|
||||
ObjectReader<UnverifiedMessage> messageReader,
|
||||
ObjectReader<SubscriptionUpdate> subscriptionUpdateReader,
|
||||
InputStream in) {
|
||||
this.readerFactory = readerFactory;
|
||||
this.messageReader = messageReader;
|
||||
this.subscriptionUpdateReader = subscriptionUpdateReader;
|
||||
r = readerFactory.createReader(in);
|
||||
this.in = in;
|
||||
header = new byte[HEADER_LENGTH];
|
||||
payload = new byte[MAX_PAYLOAD_LENGTH];
|
||||
}
|
||||
|
||||
private void readPacket() throws IOException {
|
||||
assert state == State.BUFFER_EMPTY;
|
||||
// Read the header
|
||||
int offset = 0;
|
||||
while(offset < HEADER_LENGTH) {
|
||||
int read = in.read(header, offset, HEADER_LENGTH - offset);
|
||||
if(read == -1) {
|
||||
if(offset > 0) throw new FormatException();
|
||||
state = State.EOF;
|
||||
return;
|
||||
}
|
||||
offset += read;
|
||||
}
|
||||
// Check the protocol version
|
||||
if(header[0] != PROTOCOL_VERSION) throw new FormatException();
|
||||
// Read the payload length
|
||||
payloadLength = ByteUtils.readUint16(header, 2);
|
||||
if(payloadLength > MAX_PAYLOAD_LENGTH) throw new FormatException();
|
||||
// Read the payload
|
||||
offset = 0;
|
||||
while(offset < payloadLength) {
|
||||
int read = in.read(payload, offset, payloadLength - offset);
|
||||
if(read == -1) throw new FormatException();
|
||||
offset += read;
|
||||
}
|
||||
state = State.BUFFER_FULL;
|
||||
}
|
||||
|
||||
public boolean eof() throws IOException {
|
||||
return r.eof();
|
||||
if(state == State.BUFFER_EMPTY) readPacket();
|
||||
assert state != State.BUFFER_EMPTY;
|
||||
return state == State.EOF;
|
||||
}
|
||||
|
||||
public boolean hasAck() throws IOException {
|
||||
return r.hasStruct(ACK);
|
||||
return !eof() && header[1] == ACK;
|
||||
}
|
||||
|
||||
public Ack readAck() throws IOException {
|
||||
if(!hasAck()) throw new FormatException();
|
||||
// Set up the reader
|
||||
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
|
||||
r.addConsumer(counting);
|
||||
// Read the start of the struct
|
||||
r.readStructStart(ACK);
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read the start of the payload
|
||||
r.readListStart();
|
||||
// Read the message IDs
|
||||
List<MessageId> acked = new ArrayList<MessageId>();
|
||||
r.readListStart();
|
||||
@@ -86,32 +129,41 @@ class PacketReaderImpl implements PacketReader {
|
||||
}
|
||||
if(acked.isEmpty()) throw new FormatException();
|
||||
r.readListEnd();
|
||||
// Read the end of the struct
|
||||
r.readStructEnd();
|
||||
// Reset the reader
|
||||
r.removeConsumer(counting);
|
||||
// Read the end of the payload
|
||||
r.readListEnd();
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
// Build and return the ack
|
||||
return new Ack(Collections.unmodifiableList(acked));
|
||||
}
|
||||
|
||||
public boolean hasMessage() throws IOException {
|
||||
return r.hasStruct(MESSAGE);
|
||||
return !eof() && header[1] == MESSAGE;
|
||||
}
|
||||
|
||||
public UnverifiedMessage readMessage() throws IOException {
|
||||
return messageReader.readStruct(r);
|
||||
if(!hasMessage()) throw new FormatException();
|
||||
// Set up the reader
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read and build the message
|
||||
UnverifiedMessage m = messageReader.readObject(r);
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
return m;
|
||||
}
|
||||
|
||||
public boolean hasOffer() throws IOException {
|
||||
return r.hasStruct(OFFER);
|
||||
return !eof() && header[1] == OFFER;
|
||||
}
|
||||
|
||||
public Offer readOffer() throws IOException {
|
||||
if(!hasOffer()) throw new FormatException();
|
||||
// Set up the reader
|
||||
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
|
||||
r.addConsumer(counting);
|
||||
// Read the start of the struct
|
||||
r.readStructStart(OFFER);
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read the start of the payload
|
||||
r.readListStart();
|
||||
// Read the message IDs
|
||||
List<MessageId> offered = new ArrayList<MessageId>();
|
||||
r.readListStart();
|
||||
@@ -123,27 +175,28 @@ class PacketReaderImpl implements PacketReader {
|
||||
}
|
||||
if(offered.isEmpty()) throw new FormatException();
|
||||
r.readListEnd();
|
||||
// Read the end of the struct
|
||||
r.readStructEnd();
|
||||
// Reset the reader
|
||||
r.removeConsumer(counting);
|
||||
// Read the end of the payload
|
||||
r.readListEnd();
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
// Build and return the offer
|
||||
return new Offer(Collections.unmodifiableList(offered));
|
||||
}
|
||||
|
||||
public boolean hasRequest() throws IOException {
|
||||
return r.hasStruct(REQUEST);
|
||||
return !eof() && header[1] == REQUEST;
|
||||
}
|
||||
|
||||
public Request readRequest() throws IOException {
|
||||
if(!hasRequest()) throw new FormatException();
|
||||
// Set up the reader
|
||||
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
|
||||
r.addConsumer(counting);
|
||||
// Read the start of the struct
|
||||
r.readStructStart(REQUEST);
|
||||
// Read the message IDs
|
||||
List<MessageId> requested = new ArrayList<MessageId>();
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read the start of the payload
|
||||
r.readListStart();
|
||||
// Read the message IDs
|
||||
r.readListStart();
|
||||
List<MessageId> requested = new ArrayList<MessageId>();
|
||||
while(!r.hasListEnd()) {
|
||||
byte[] b = r.readBytes(UniqueId.LENGTH);
|
||||
if(b.length != UniqueId.LENGTH)
|
||||
@@ -152,85 +205,134 @@ class PacketReaderImpl implements PacketReader {
|
||||
}
|
||||
if(requested.isEmpty()) throw new FormatException();
|
||||
r.readListEnd();
|
||||
// Read the end of the struct
|
||||
r.readStructEnd();
|
||||
// Reset the reader
|
||||
r.removeConsumer(counting);
|
||||
// Read the end of the payload
|
||||
r.readListEnd();
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
// Build and return the request
|
||||
return new Request(Collections.unmodifiableList(requested));
|
||||
}
|
||||
|
||||
public boolean hasRetentionAck() throws IOException {
|
||||
return r.hasStruct(RETENTION_ACK);
|
||||
return !eof() && header[1] == RETENTION_ACK;
|
||||
}
|
||||
|
||||
public RetentionAck readRetentionAck() throws IOException {
|
||||
r.readStructStart(RETENTION_ACK);
|
||||
if(!hasRetentionAck()) throw new FormatException();
|
||||
// Set up the reader
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read the start of the payload
|
||||
r.readListStart();
|
||||
// Read the version
|
||||
long version = r.readInteger();
|
||||
if(version < 0) throw new FormatException();
|
||||
r.readStructEnd();
|
||||
// Read the end of the payload
|
||||
r.readListEnd();
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
// Build and return the retention ack
|
||||
return new RetentionAck(version);
|
||||
}
|
||||
|
||||
public boolean hasRetentionUpdate() throws IOException {
|
||||
return r.hasStruct(RETENTION_UPDATE);
|
||||
return !eof() && header[1] == RETENTION_UPDATE;
|
||||
}
|
||||
|
||||
public RetentionUpdate readRetentionUpdate() throws IOException {
|
||||
r.readStructStart(RETENTION_UPDATE);
|
||||
if(!hasRetentionUpdate()) throw new FormatException();
|
||||
// Set up the reader
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read the start of the payload
|
||||
r.readListStart();
|
||||
// Read the retention time and version
|
||||
long retention = r.readInteger();
|
||||
if(retention < 0) throw new FormatException();
|
||||
long version = r.readInteger();
|
||||
if(version < 0) throw new FormatException();
|
||||
r.readStructEnd();
|
||||
// Read the end of the payload
|
||||
r.readListEnd();
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
// Build and return the retention update
|
||||
return new RetentionUpdate(retention, version);
|
||||
}
|
||||
|
||||
public boolean hasSubscriptionAck() throws IOException {
|
||||
return r.hasStruct(SUBSCRIPTION_ACK);
|
||||
return !eof() && header[1] == SUBSCRIPTION_ACK;
|
||||
}
|
||||
|
||||
public SubscriptionAck readSubscriptionAck() throws IOException {
|
||||
r.readStructStart(SUBSCRIPTION_ACK);
|
||||
if(!hasSubscriptionAck()) throw new FormatException();
|
||||
// Set up the reader
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read the start of the payload
|
||||
r.readListStart();
|
||||
// Read the version
|
||||
long version = r.readInteger();
|
||||
if(version < 0) throw new FormatException();
|
||||
r.readStructEnd();
|
||||
// Read the end of the payload
|
||||
r.readListEnd();
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
// Build and return the subscription ack
|
||||
return new SubscriptionAck(version);
|
||||
}
|
||||
|
||||
public boolean hasSubscriptionUpdate() throws IOException {
|
||||
return r.hasStruct(SUBSCRIPTION_UPDATE);
|
||||
return !eof() && header[1] == SUBSCRIPTION_UPDATE;
|
||||
}
|
||||
|
||||
public SubscriptionUpdate readSubscriptionUpdate() throws IOException {
|
||||
return subscriptionUpdateReader.readStruct(r);
|
||||
if(!hasSubscriptionUpdate()) throw new FormatException();
|
||||
// Set up the reader
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read and build the subscription update
|
||||
SubscriptionUpdate u = subscriptionUpdateReader.readObject(r);
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
return u;
|
||||
}
|
||||
|
||||
public boolean hasTransportAck() throws IOException {
|
||||
return r.hasStruct(TRANSPORT_ACK);
|
||||
return !eof() && header[1] == TRANSPORT_ACK;
|
||||
}
|
||||
|
||||
public TransportAck readTransportAck() throws IOException {
|
||||
r.readStructStart(TRANSPORT_ACK);
|
||||
if(!hasTransportAck()) throw new FormatException();
|
||||
// Set up the reader
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read the start of the payload
|
||||
r.readListStart();
|
||||
// Read the transport ID and version
|
||||
String idString = r.readString(MAX_TRANSPORT_ID_LENGTH);
|
||||
if(idString.length() == 0) throw new FormatException();
|
||||
TransportId id = new TransportId(idString);
|
||||
long version = r.readInteger();
|
||||
if(version < 0) throw new FormatException();
|
||||
r.readStructEnd();
|
||||
// Read the end of the payload
|
||||
r.readListEnd();
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
// Build and return the transport ack
|
||||
return new TransportAck(id, version);
|
||||
}
|
||||
|
||||
public boolean hasTransportUpdate() throws IOException {
|
||||
return r.hasStruct(TRANSPORT_UPDATE);
|
||||
return !eof() && header[1] == TRANSPORT_UPDATE;
|
||||
}
|
||||
|
||||
public TransportUpdate readTransportUpdate() throws IOException {
|
||||
if(!hasTransportUpdate()) throw new FormatException();
|
||||
// Set up the reader
|
||||
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
|
||||
r.addConsumer(counting);
|
||||
// Read the start of the struct
|
||||
r.readStructStart(TRANSPORT_UPDATE);
|
||||
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
|
||||
Reader r = readerFactory.createReader(bais);
|
||||
// Read the start of the payload
|
||||
r.readListStart();
|
||||
// Read the transport ID
|
||||
String idString = r.readString(MAX_TRANSPORT_ID_LENGTH);
|
||||
if(idString.length() == 0) throw new FormatException();
|
||||
@@ -249,10 +351,10 @@ class PacketReaderImpl implements PacketReader {
|
||||
// Read the version number
|
||||
long version = r.readInteger();
|
||||
if(version < 0) throw new FormatException();
|
||||
// Read the end of the struct
|
||||
r.readStructEnd();
|
||||
// Reset the reader
|
||||
r.removeConsumer(counting);
|
||||
// Read the end of the payload
|
||||
r.readListEnd();
|
||||
if(!r.eof()) throw new FormatException();
|
||||
state = State.BUFFER_EMPTY;
|
||||
// Build and return the transport update
|
||||
return new TransportUpdate(id, new TransportProperties(p), version);
|
||||
}
|
||||
|
||||
@@ -1,17 +1,19 @@
|
||||
package org.briarproject.messaging;
|
||||
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
|
||||
import static org.briarproject.api.messaging.Types.ACK;
|
||||
import static org.briarproject.api.messaging.Types.GROUP;
|
||||
import static org.briarproject.api.messaging.Types.OFFER;
|
||||
import static org.briarproject.api.messaging.Types.REQUEST;
|
||||
import static org.briarproject.api.messaging.Types.RETENTION_ACK;
|
||||
import static org.briarproject.api.messaging.Types.RETENTION_UPDATE;
|
||||
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_ACK;
|
||||
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_UPDATE;
|
||||
import static org.briarproject.api.messaging.Types.TRANSPORT_ACK;
|
||||
import static org.briarproject.api.messaging.Types.TRANSPORT_UPDATE;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.HEADER_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PAYLOAD_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.PROTOCOL_VERSION;
|
||||
import static org.briarproject.api.messaging.PacketTypes.ACK;
|
||||
import static org.briarproject.api.messaging.PacketTypes.OFFER;
|
||||
import static org.briarproject.api.messaging.PacketTypes.REQUEST;
|
||||
import static org.briarproject.api.messaging.PacketTypes.RETENTION_ACK;
|
||||
import static org.briarproject.api.messaging.PacketTypes.RETENTION_UPDATE;
|
||||
import static org.briarproject.api.messaging.PacketTypes.SUBSCRIPTION_ACK;
|
||||
import static org.briarproject.api.messaging.PacketTypes.SUBSCRIPTION_UPDATE;
|
||||
import static org.briarproject.api.messaging.PacketTypes.TRANSPORT_ACK;
|
||||
import static org.briarproject.api.messaging.PacketTypes.TRANSPORT_UPDATE;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
@@ -19,6 +21,7 @@ import org.briarproject.api.messaging.Ack;
|
||||
import org.briarproject.api.messaging.Group;
|
||||
import org.briarproject.api.messaging.MessageId;
|
||||
import org.briarproject.api.messaging.Offer;
|
||||
import org.briarproject.api.messaging.PacketTypes;
|
||||
import org.briarproject.api.messaging.PacketWriter;
|
||||
import org.briarproject.api.messaging.Request;
|
||||
import org.briarproject.api.messaging.RetentionAck;
|
||||
@@ -30,118 +33,161 @@ import org.briarproject.api.messaging.TransportUpdate;
|
||||
import org.briarproject.api.serial.SerialComponent;
|
||||
import org.briarproject.api.serial.Writer;
|
||||
import org.briarproject.api.serial.WriterFactory;
|
||||
import org.briarproject.util.ByteUtils;
|
||||
|
||||
// This class is not thread-safe
|
||||
class PacketWriterImpl implements PacketWriter {
|
||||
|
||||
private final SerialComponent serial;
|
||||
private final WriterFactory writerFactory;
|
||||
private final OutputStream out;
|
||||
private final Writer w;
|
||||
private final byte[] header;
|
||||
private final ByteArrayOutputStream payload;
|
||||
|
||||
PacketWriterImpl(SerialComponent serial, WriterFactory writerFactory,
|
||||
OutputStream out) {
|
||||
this.serial = serial;
|
||||
this.writerFactory = writerFactory;
|
||||
this.out = out;
|
||||
w = writerFactory.createWriter(out);
|
||||
header = new byte[HEADER_LENGTH];
|
||||
header[0] = PROTOCOL_VERSION;
|
||||
payload = new ByteArrayOutputStream(MAX_PAYLOAD_LENGTH);
|
||||
}
|
||||
|
||||
public int getMaxMessagesForAck(long capacity) {
|
||||
return getMaxMessagesForPacket(capacity, ACK);
|
||||
return getMaxMessagesForPacket(capacity);
|
||||
}
|
||||
|
||||
public int getMaxMessagesForRequest(long capacity) {
|
||||
return getMaxMessagesForPacket(capacity, REQUEST);
|
||||
return getMaxMessagesForPacket(capacity);
|
||||
}
|
||||
|
||||
public int getMaxMessagesForOffer(long capacity) {
|
||||
return getMaxMessagesForPacket(capacity, OFFER);
|
||||
return getMaxMessagesForPacket(capacity);
|
||||
}
|
||||
|
||||
private int getMaxMessagesForPacket(long capacity, int structId) {
|
||||
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
|
||||
int overhead = serial.getSerialisedStructStartLength(structId)
|
||||
+ serial.getSerialisedListStartLength()
|
||||
+ serial.getSerialisedListEndLength()
|
||||
+ serial.getSerialisedStructEndLength();
|
||||
private int getMaxMessagesForPacket(long capacity) {
|
||||
int payload = (int) Math.min(capacity - HEADER_LENGTH,
|
||||
MAX_PAYLOAD_LENGTH);
|
||||
int overhead = serial.getSerialisedListStartLength() * 2
|
||||
+ serial.getSerialisedListEndLength() * 2;
|
||||
int idLength = serial.getSerialisedUniqueIdLength();
|
||||
return (packet - overhead) / idLength;
|
||||
return (payload - overhead) / idLength;
|
||||
}
|
||||
|
||||
private void writePacket(byte packetType) throws IOException {
|
||||
header[1] = packetType;
|
||||
ByteUtils.writeUint16(payload.size(), header, 2);
|
||||
out.write(header);
|
||||
payload.writeTo(out);
|
||||
payload.reset();
|
||||
}
|
||||
|
||||
public void writeAck(Ack a) throws IOException {
|
||||
w.writeStructStart(ACK);
|
||||
assert payload.size() == 0;
|
||||
Writer w = writerFactory.createWriter(payload);
|
||||
w.writeListStart();
|
||||
w.writeListStart();
|
||||
for(MessageId m : a.getMessageIds()) w.writeBytes(m.getBytes());
|
||||
w.writeListEnd();
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
writePacket(ACK);
|
||||
}
|
||||
|
||||
public void writeMessage(byte[] raw) throws IOException {
|
||||
header[1] = PacketTypes.MESSAGE;
|
||||
ByteUtils.writeUint16(raw.length, header, 2);
|
||||
out.write(header);
|
||||
out.write(raw);
|
||||
}
|
||||
|
||||
public void writeOffer(Offer o) throws IOException {
|
||||
w.writeStructStart(OFFER);
|
||||
assert payload.size() == 0;
|
||||
Writer w = writerFactory.createWriter(payload);
|
||||
w.writeListStart();
|
||||
w.writeListStart();
|
||||
for(MessageId m : o.getMessageIds()) w.writeBytes(m.getBytes());
|
||||
w.writeListEnd();
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
writePacket(OFFER);
|
||||
}
|
||||
|
||||
public void writeRequest(Request r) throws IOException {
|
||||
w.writeStructStart(REQUEST);
|
||||
assert payload.size() == 0;
|
||||
Writer w = writerFactory.createWriter(payload);
|
||||
w.writeListStart();
|
||||
w.writeListStart();
|
||||
for(MessageId m : r.getMessageIds()) w.writeBytes(m.getBytes());
|
||||
w.writeListEnd();
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
writePacket(REQUEST);
|
||||
}
|
||||
|
||||
public void writeRetentionAck(RetentionAck a) throws IOException {
|
||||
w.writeStructStart(RETENTION_ACK);
|
||||
assert payload.size() == 0;
|
||||
Writer w = writerFactory.createWriter(payload);
|
||||
w.writeListStart();
|
||||
w.writeInteger(a.getVersion());
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
writePacket(RETENTION_ACK);
|
||||
}
|
||||
|
||||
public void writeRetentionUpdate(RetentionUpdate u) throws IOException {
|
||||
w.writeStructStart(RETENTION_UPDATE);
|
||||
assert payload.size() == 0;
|
||||
Writer w = writerFactory.createWriter(payload);
|
||||
w.writeListStart();
|
||||
w.writeInteger(u.getRetentionTime());
|
||||
w.writeInteger(u.getVersion());
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
writePacket(RETENTION_UPDATE);
|
||||
}
|
||||
|
||||
public void writeSubscriptionAck(SubscriptionAck a) throws IOException {
|
||||
w.writeStructStart(SUBSCRIPTION_ACK);
|
||||
assert payload.size() == 0;
|
||||
Writer w = writerFactory.createWriter(payload);
|
||||
w.writeListStart();
|
||||
w.writeInteger(a.getVersion());
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
writePacket(SUBSCRIPTION_ACK);
|
||||
}
|
||||
|
||||
public void writeSubscriptionUpdate(SubscriptionUpdate u)
|
||||
throws IOException {
|
||||
w.writeStructStart(SUBSCRIPTION_UPDATE);
|
||||
assert payload.size() == 0;
|
||||
Writer w = writerFactory.createWriter(payload);
|
||||
w.writeListStart();
|
||||
w.writeListStart();
|
||||
for(Group g : u.getGroups()) {
|
||||
w.writeStructStart(GROUP);
|
||||
w.writeListStart();
|
||||
w.writeString(g.getName());
|
||||
w.writeBytes(g.getSalt());
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
}
|
||||
w.writeListEnd();
|
||||
w.writeInteger(u.getVersion());
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
writePacket(SUBSCRIPTION_UPDATE);
|
||||
}
|
||||
|
||||
public void writeTransportAck(TransportAck a) throws IOException {
|
||||
w.writeStructStart(TRANSPORT_ACK);
|
||||
assert payload.size() == 0;
|
||||
Writer w = writerFactory.createWriter(payload);
|
||||
w.writeListStart();
|
||||
w.writeString(a.getId().getString());
|
||||
w.writeInteger(a.getVersion());
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
writePacket(TRANSPORT_ACK);
|
||||
}
|
||||
|
||||
public void writeTransportUpdate(TransportUpdate u) throws IOException {
|
||||
w.writeStructStart(TRANSPORT_UPDATE);
|
||||
assert payload.size() == 0;
|
||||
Writer w = writerFactory.createWriter(payload);
|
||||
w.writeListStart();
|
||||
w.writeString(u.getId().getString());
|
||||
w.writeMap(u.getProperties());
|
||||
w.writeInteger(u.getVersion());
|
||||
w.writeStructEnd();
|
||||
w.writeListEnd();
|
||||
writePacket(TRANSPORT_UPDATE);
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
|
||||
@@ -2,7 +2,7 @@ package org.briarproject.messaging;
|
||||
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PAYLOAD_LENGTH;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
@@ -167,7 +167,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
|
||||
if(interrupted) return;
|
||||
try {
|
||||
Collection<byte[]> b = db.generateBatch(contactId,
|
||||
MAX_PACKET_LENGTH, maxLatency);
|
||||
MAX_PAYLOAD_LENGTH, maxLatency);
|
||||
if(LOG.isLoggable(INFO))
|
||||
LOG.info("Generated batch: " + (b != null));
|
||||
if(b == null) decrementOutstandingQueries();
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
package org.briarproject.messaging;
|
||||
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_PAYLOAD_LENGTH;
|
||||
import static org.briarproject.api.messaging.MessagingConstants.MAX_SUBSCRIPTIONS;
|
||||
import static org.briarproject.api.messaging.Types.SUBSCRIPTION_UPDATE;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@@ -17,29 +16,29 @@ import org.briarproject.api.messaging.GroupId;
|
||||
import org.briarproject.api.messaging.SubscriptionUpdate;
|
||||
import org.briarproject.api.serial.Consumer;
|
||||
import org.briarproject.api.serial.CountingConsumer;
|
||||
import org.briarproject.api.serial.ObjectReader;
|
||||
import org.briarproject.api.serial.Reader;
|
||||
import org.briarproject.api.serial.StructReader;
|
||||
|
||||
class SubscriptionUpdateReader implements StructReader<SubscriptionUpdate> {
|
||||
class SubscriptionUpdateReader implements ObjectReader<SubscriptionUpdate> {
|
||||
|
||||
private final StructReader<Group> groupReader;
|
||||
private final ObjectReader<Group> groupReader;
|
||||
|
||||
SubscriptionUpdateReader(StructReader<Group> groupReader) {
|
||||
SubscriptionUpdateReader(ObjectReader<Group> groupReader) {
|
||||
this.groupReader = groupReader;
|
||||
}
|
||||
|
||||
public SubscriptionUpdate readStruct(Reader r) throws IOException {
|
||||
public SubscriptionUpdate readObject(Reader r) throws IOException {
|
||||
// Set up the reader
|
||||
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
|
||||
Consumer counting = new CountingConsumer(MAX_PAYLOAD_LENGTH);
|
||||
r.addConsumer(counting);
|
||||
// Read the start of the struct
|
||||
r.readStructStart(SUBSCRIPTION_UPDATE);
|
||||
// Read the start of the update
|
||||
r.readListStart();
|
||||
// Read the subscriptions, rejecting duplicates
|
||||
List<Group> groups = new ArrayList<Group>();
|
||||
Set<GroupId> ids = new HashSet<GroupId>();
|
||||
r.readListStart();
|
||||
for(int i = 0; i < MAX_SUBSCRIPTIONS && !r.hasListEnd(); i++) {
|
||||
Group g = groupReader.readStruct(r);
|
||||
Group g = groupReader.readObject(r);
|
||||
if(!ids.add(g.getId())) throw new FormatException(); // Duplicate
|
||||
groups.add(g);
|
||||
}
|
||||
@@ -47,8 +46,8 @@ class SubscriptionUpdateReader implements StructReader<SubscriptionUpdate> {
|
||||
// Read the version number
|
||||
long version = r.readInteger();
|
||||
if(version < 0) throw new FormatException();
|
||||
// Read the end of the struct
|
||||
r.readStructEnd();
|
||||
// Read the end of the update
|
||||
r.readListEnd();
|
||||
// Reset the reader
|
||||
r.removeConsumer(counting);
|
||||
// Build and return the subscription update
|
||||
|
||||
Reference in New Issue
Block a user