Removed SubscriptionUpdate and SubscriptionAck.

This commit is contained in:
akwizgran
2016-01-27 16:31:07 +00:00
parent e85b2161e4
commit 9fdc510843
27 changed files with 27 additions and 885 deletions

View File

@@ -16,6 +16,7 @@ import javax.inject.Inject;
import static org.briarproject.api.db.StorageStatus.ADDING;
// TODO: Move this class to the identity package
class AuthorFactoryImpl implements AuthorFactory {
private final CryptoComponent crypto;

View File

@@ -11,6 +11,7 @@ import java.io.IOException;
import static org.briarproject.api.identity.AuthorConstants.MAX_AUTHOR_NAME_LENGTH;
import static org.briarproject.api.identity.AuthorConstants.MAX_PUBLIC_KEY_LENGTH;
// TODO: Move this class to the identity package
class AuthorReader implements ObjectReader<Author> {
private final AuthorFactory authorFactory;

View File

@@ -21,8 +21,6 @@ import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSession;
import org.briarproject.api.system.Clock;
@@ -87,9 +85,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
public void run() throws IOException {
eventBus.addListener(this);
try {
// Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
// Start a query for each type of packet
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
@@ -118,7 +114,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
now = clock.currentTimeMillis();
if (now >= nextRetxQuery) {
// Check for retransmittable packets
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
nextRetxQuery = now + RETX_QUERY_INTERVAL;
@@ -163,10 +158,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof LocalSubscriptionsUpdatedEvent) {
LocalSubscriptionsUpdatedEvent l =
(LocalSubscriptionsUpdatedEvent) e;
if (l.getAffectedContacts().contains(contactId)) {
dbExecutor.execute(new GenerateSubscriptionUpdate());
if (l.getAffectedContacts().contains(contactId))
dbExecutor.execute(new GenerateOffer());
}
} else if (e instanceof MessageRequestedEvent) {
if (((MessageRequestedEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateBatch());
@@ -179,10 +172,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof RemoteSubscriptionsUpdatedEvent) {
RemoteSubscriptionsUpdatedEvent r =
(RemoteSubscriptionsUpdatedEvent) e;
if (r.getContactId().equals(contactId)) {
dbExecutor.execute(new GenerateSubscriptionAck());
if (r.getContactId().equals(contactId))
dbExecutor.execute(new GenerateOffer());
}
} else if (e instanceof ShutdownEvent) {
interrupt();
} else if (e instanceof TransportRemovedEvent) {
@@ -332,75 +323,4 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateRequest());
}
}
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
if (interrupted) return;
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated subscription ack: " + (a != null));
if (a != null) writerTasks.add(new WriteSubscriptionAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteSubscriptionAck
implements ThrowingRunnable<IOException> {
private final SubscriptionAck ack;
private WriteSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeSubscriptionAck(ack);
LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck());
}
}
// This task runs on the database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
if (interrupted) return;
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated subscription update: " + (u != null));
if (u != null) writerTasks.add(new WriteSubscriptionUpdate(u));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteSubscriptionUpdate
implements ThrowingRunnable<IOException> {
private final SubscriptionUpdate update;
private WriteSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeSubscriptionUpdate(update);
LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate());
}
}
}

View File

@@ -1,31 +0,0 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.UniqueId;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.sync.ClientId;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupFactory;
import java.io.IOException;
import static org.briarproject.api.sync.SyncConstants.MAX_GROUP_DESCRIPTOR_LENGTH;
class GroupReader implements ObjectReader<Group> {
private final GroupFactory groupFactory;
GroupReader(GroupFactory groupFactory) {
this.groupFactory = groupFactory;
}
public Group readObject(BdfReader r) throws IOException {
r.readListStart();
byte[] id = r.readRaw(UniqueId.LENGTH);
if (id.length != UniqueId.LENGTH) throw new FormatException();
byte[] descriptor = r.readRaw(MAX_GROUP_DESCRIPTOR_LENGTH);
r.readListEnd();
return groupFactory.createGroup(new ClientId(id), descriptor);
}
}

View File

@@ -16,8 +16,6 @@ import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSession;
import java.io.IOException;
@@ -69,12 +67,6 @@ class IncomingSession implements SyncSession, EventListener {
} else if (packetReader.hasRequest()) {
Request r = packetReader.readRequest();
dbExecutor.execute(new ReceiveRequest(r));
} else if (packetReader.hasSubscriptionAck()) {
SubscriptionAck a = packetReader.readSubscriptionAck();
dbExecutor.execute(new ReceiveSubscriptionAck(a));
} else if (packetReader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = packetReader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else {
throw new FormatException();
}
@@ -172,40 +164,4 @@ class IncomingSession implements SyncSession, EventListener {
}
}
}
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private ReceiveSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveSubscriptionAck(contactId, ack);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;
private ReceiveSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveSubscriptionUpdate(contactId, update);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
}

View File

@@ -1,11 +1,8 @@
package org.briarproject.sync;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.PacketReaderFactory;
import org.briarproject.api.sync.SubscriptionUpdate;
import java.io.InputStream;
@@ -14,20 +11,13 @@ import javax.inject.Inject;
class PacketReaderFactoryImpl implements PacketReaderFactory {
private final CryptoComponent crypto;
private final BdfReaderFactory bdfReaderFactory;
private final ObjectReader<SubscriptionUpdate> subscriptionUpdateReader;
@Inject
PacketReaderFactoryImpl(CryptoComponent crypto,
BdfReaderFactory bdfReaderFactory,
ObjectReader<SubscriptionUpdate> subscriptionUpdateReader) {
PacketReaderFactoryImpl(CryptoComponent crypto) {
this.crypto = crypto;
this.bdfReaderFactory = bdfReaderFactory;
this.subscriptionUpdateReader = subscriptionUpdateReader;
}
public PacketReader createPacketReader(InputStream in) {
return new PacketReaderImpl(crypto, bdfReaderFactory,
subscriptionUpdateReader, in);
return new PacketReaderImpl(crypto, in);
}
}

View File

@@ -3,9 +3,6 @@ package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.UniqueId;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.Message;
@@ -13,11 +10,8 @@ import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.util.ByteUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -28,8 +22,6 @@ import static org.briarproject.api.sync.PacketTypes.ACK;
import static org.briarproject.api.sync.PacketTypes.MESSAGE;
import static org.briarproject.api.sync.PacketTypes.OFFER;
import static org.briarproject.api.sync.PacketTypes.REQUEST;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PACKET_HEADER_LENGTH;
@@ -41,20 +33,14 @@ class PacketReaderImpl implements PacketReader {
private enum State { BUFFER_EMPTY, BUFFER_FULL, EOF }
private final CryptoComponent crypto;
private final BdfReaderFactory bdfReaderFactory;
private final ObjectReader<SubscriptionUpdate> subscriptionUpdateReader;
private final InputStream in;
private final byte[] header, payload;
private State state = State.BUFFER_EMPTY;
private int payloadLength = 0;
PacketReaderImpl(CryptoComponent crypto, BdfReaderFactory bdfReaderFactory,
ObjectReader<SubscriptionUpdate> subscriptionUpdateReader,
InputStream in) {
PacketReaderImpl(CryptoComponent crypto, InputStream in) {
this.crypto = crypto;
this.bdfReaderFactory = bdfReaderFactory;
this.subscriptionUpdateReader = subscriptionUpdateReader;
this.in = in;
header = new byte[PACKET_HEADER_LENGTH];
payload = new byte[MAX_PACKET_PAYLOAD_LENGTH];
@@ -156,42 +142,4 @@ class PacketReaderImpl implements PacketReader {
if (!hasRequest()) throw new FormatException();
return new Request(Collections.unmodifiableList(readMessageIds()));
}
public boolean hasSubscriptionAck() throws IOException {
return !eof() && header[1] == SUBSCRIPTION_ACK;
}
public SubscriptionAck readSubscriptionAck() throws IOException {
if (!hasSubscriptionAck()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
BdfReader r = bdfReaderFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the version
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the subscription ack
return new SubscriptionAck(version);
}
public boolean hasSubscriptionUpdate() throws IOException {
return !eof() && header[1] == SUBSCRIPTION_UPDATE;
}
public SubscriptionUpdate readSubscriptionUpdate() throws IOException {
if (!hasSubscriptionUpdate()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
BdfReader r = bdfReaderFactory.createReader(bais);
// Read and build the subscription update
SubscriptionUpdate u = subscriptionUpdateReader.readObject(r);
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
return u;
}
}

View File

@@ -1,23 +1,13 @@
package org.briarproject.sync;
import org.briarproject.api.data.BdfWriterFactory;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.PacketWriterFactory;
import java.io.OutputStream;
import javax.inject.Inject;
class PacketWriterFactoryImpl implements PacketWriterFactory {
private final BdfWriterFactory bdfWriterFactory;
@Inject
PacketWriterFactoryImpl(BdfWriterFactory bdfWriterFactory) {
this.bdfWriterFactory = bdfWriterFactory;
}
public PacketWriter createPacketWriter(OutputStream out) {
return new PacketWriterImpl(bdfWriterFactory, out);
return new PacketWriterImpl(out);
}
}

View File

@@ -1,17 +1,12 @@
package org.briarproject.sync;
import org.briarproject.api.UniqueId;
import org.briarproject.api.data.BdfWriter;
import org.briarproject.api.data.BdfWriterFactory;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketTypes;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.util.ByteUtils;
import java.io.ByteArrayOutputStream;
@@ -21,8 +16,6 @@ import java.io.OutputStream;
import static org.briarproject.api.sync.PacketTypes.ACK;
import static org.briarproject.api.sync.PacketTypes.OFFER;
import static org.briarproject.api.sync.PacketTypes.REQUEST;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PACKET_HEADER_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PROTOCOL_VERSION;
@@ -30,13 +23,11 @@ import static org.briarproject.api.sync.SyncConstants.PROTOCOL_VERSION;
// This class is not thread-safe
class PacketWriterImpl implements PacketWriter {
private final BdfWriterFactory bdfWriterFactory;
private final OutputStream out;
private final byte[] header;
private final ByteArrayOutputStream payload;
PacketWriterImpl(BdfWriterFactory bdfWriterFactory, OutputStream out) {
this.bdfWriterFactory = bdfWriterFactory;
PacketWriterImpl(OutputStream out) {
this.out = out;
header = new byte[PACKET_HEADER_LENGTH];
header[0] = PROTOCOL_VERSION;
@@ -94,33 +85,6 @@ class PacketWriterImpl implements PacketWriter {
writePacket(REQUEST);
}
public void writeSubscriptionAck(SubscriptionAck a) throws IOException {
if (payload.size() != 0) throw new IllegalStateException();
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeInteger(a.getVersion());
w.writeListEnd();
writePacket(SUBSCRIPTION_ACK);
}
public void writeSubscriptionUpdate(SubscriptionUpdate u)
throws IOException {
if (payload.size() != 0) throw new IllegalStateException();
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeListStart();
for (Group g : u.getGroups()) {
w.writeListStart();
w.writeRaw(g.getClientId().getBytes());
w.writeRaw(g.getDescriptor());
w.writeListEnd();
}
w.writeListEnd();
w.writeInteger(u.getVersion());
w.writeListEnd();
writePacket(SUBSCRIPTION_UPDATE);
}
public void flush() throws IOException {
out.flush();
}

View File

@@ -12,8 +12,6 @@ import org.briarproject.api.event.ShutdownEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSession;
import java.io.IOException;
@@ -66,16 +64,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
this.transportId = transportId;
this.maxLatency = maxLatency;
this.packetWriter = packetWriter;
outstandingQueries = new AtomicInteger(4); // One per type of packet
outstandingQueries = new AtomicInteger(2); // One per type of packet
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
}
public void run() throws IOException {
eventBus.addListener(this);
try {
// Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
// Start a query for each type of packet
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
// Write packets until interrupted or no more packets to write
@@ -187,77 +183,4 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateBatch());
}
}
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
if (interrupted) return;
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated subscription ack: " + (a != null));
if (a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteSubscriptionAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteSubscriptionAck
implements ThrowingRunnable<IOException> {
private final SubscriptionAck ack;
private WriteSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeSubscriptionAck(ack);
LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck());
}
}
// This task runs on the database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
if (interrupted) return;
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated subscription update: " + (u != null));
if (u == null) decrementOutstandingQueries();
else writerTasks.add(new WriteSubscriptionUpdate(u));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteSubscriptionUpdate
implements ThrowingRunnable<IOException> {
private final SubscriptionUpdate update;
private WriteSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeSubscriptionUpdate(update);
LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate());
}
}
}

View File

@@ -1,44 +0,0 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.SubscriptionUpdate;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.briarproject.api.sync.SyncConstants.MAX_SUBSCRIPTIONS;
class SubscriptionUpdateReader implements ObjectReader<SubscriptionUpdate> {
private final ObjectReader<Group> groupReader;
SubscriptionUpdateReader(ObjectReader<Group> groupReader) {
this.groupReader = groupReader;
}
public SubscriptionUpdate readObject(BdfReader r) throws IOException {
r.readListStart();
List<Group> groups = new ArrayList<Group>();
Set<GroupId> ids = new HashSet<GroupId>();
r.readListStart();
for (int i = 0; i < MAX_SUBSCRIPTIONS && !r.hasListEnd(); i++) {
Group g = groupReader.readObject(r);
if (!ids.add(g.getId())) throw new FormatException(); // Duplicate
groups.add(g);
}
r.readListEnd();
long version = r.readInteger();
if (version < 0) throw new FormatException();
r.readListEnd();
groups = Collections.unmodifiableList(groups);
return new SubscriptionUpdate(groups, version);
}
}

View File

@@ -8,13 +8,11 @@ import org.briarproject.api.event.EventBus;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorFactory;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupFactory;
import org.briarproject.api.sync.MessageFactory;
import org.briarproject.api.sync.PacketReaderFactory;
import org.briarproject.api.sync.PacketWriterFactory;
import org.briarproject.api.sync.PrivateGroupFactory;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSessionFactory;
import org.briarproject.api.sync.ValidationManager;
@@ -39,17 +37,6 @@ public class SyncModule extends AbstractModule {
return new AuthorReader(authorFactory);
}
@Provides
ObjectReader<Group> getGroupReader(GroupFactory groupFactory) {
return new GroupReader(groupFactory);
}
@Provides
ObjectReader<SubscriptionUpdate> getSubscriptionUpdateReader(
ObjectReader<Group> groupReader) {
return new SubscriptionUpdateReader(groupReader);
}
@Provides @Singleton
ValidationManager getValidationManager(LifecycleManager lifecycleManager,
EventBus eventBus, ValidationManagerImpl validationManager) {