Updated simplex and duplex connections to handle the new packet types.

This commit is contained in:
akwizgran
2013-01-30 15:35:35 +00:00
parent f5ec5b9569
commit c1d2891763
6 changed files with 293 additions and 33 deletions

View File

@@ -10,26 +10,24 @@ import net.sf.briar.api.serial.ReaderFactory;
import net.sf.briar.api.serial.StructReader; import net.sf.briar.api.serial.StructReader;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Provider;
// FIXME: See whether these providers can be got rid of
class ProtocolReaderFactoryImpl implements ProtocolReaderFactory { class ProtocolReaderFactoryImpl implements ProtocolReaderFactory {
private final ReaderFactory readerFactory; private final ReaderFactory readerFactory;
private final Provider<StructReader<UnverifiedMessage>> messageProvider; private final StructReader<UnverifiedMessage> messageReader;
private final Provider<StructReader<SubscriptionUpdate>> subscriptionUpdateProvider; private final StructReader<SubscriptionUpdate> subscriptionUpdateReader;
@Inject @Inject
ProtocolReaderFactoryImpl(ReaderFactory readerFactory, ProtocolReaderFactoryImpl(ReaderFactory readerFactory,
Provider<StructReader<UnverifiedMessage>> messageProvider, StructReader<UnverifiedMessage> messageReader,
Provider<StructReader<SubscriptionUpdate>> subscriptionUpdateProvider) { StructReader<SubscriptionUpdate> subscriptionUpdateReader) {
this.readerFactory = readerFactory; this.readerFactory = readerFactory;
this.messageProvider = messageProvider; this.messageReader = messageReader;
this.subscriptionUpdateProvider = subscriptionUpdateProvider; this.subscriptionUpdateReader = subscriptionUpdateReader;
} }
public ProtocolReader createProtocolReader(InputStream in) { public ProtocolReader createProtocolReader(InputStream in) {
return new ProtocolReaderImpl(readerFactory, messageProvider.get(), return new ProtocolReaderImpl(readerFactory, messageReader,
subscriptionUpdateProvider.get(), in); subscriptionUpdateReader, in);
} }
} }

View File

@@ -27,10 +27,10 @@ import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.event.ContactRemovedEvent; import net.sf.briar.api.db.event.ContactRemovedEvent;
import net.sf.briar.api.db.event.DatabaseEvent; import net.sf.briar.api.db.event.DatabaseEvent;
import net.sf.briar.api.db.event.DatabaseListener; import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.db.event.MessageAddedEvent;
import net.sf.briar.api.db.event.MessageReceivedEvent;
import net.sf.briar.api.db.event.LocalSubscriptionsUpdatedEvent; import net.sf.briar.api.db.event.LocalSubscriptionsUpdatedEvent;
import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent; import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent;
import net.sf.briar.api.db.event.MessageAddedEvent;
import net.sf.briar.api.db.event.MessageReceivedEvent;
import net.sf.briar.api.plugins.duplex.DuplexTransportConnection; import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -42,7 +42,11 @@ import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriter; import net.sf.briar.api.protocol.ProtocolWriter;
import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.Request; import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.RetentionAck;
import net.sf.briar.api.protocol.RetentionUpdate;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportId; import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedMessage; import net.sf.briar.api.protocol.UnverifiedMessage;
@@ -55,7 +59,6 @@ import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory; import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.util.ByteUtils; import net.sf.briar.util.ByteUtils;
// FIXME: Read and write subscription and transport acks
abstract class DuplexConnection implements DatabaseListener { abstract class DuplexConnection implements DatabaseListener {
private static final Logger LOG = private static final Logger LOG =
@@ -208,9 +211,13 @@ abstract class DuplexConnection implements DatabaseListener {
OutputStream out = createConnectionWriter().getOutputStream(); OutputStream out = createConnectionWriter().getOutputStream();
writer = protoWriterFactory.createProtocolWriter(out, writer = protoWriterFactory.createProtocolWriter(out,
transport.shouldFlush()); transport.shouldFlush());
// Send the initial packets: transports, subs, acks, offer // Send the initial packets: updates, acks, offer
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdate()); dbExecutor.execute(new GenerateTransportUpdate());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate()); dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionAck());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateAcks()); dbExecutor.execute(new GenerateAcks());
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
// Main loop // Main loop
@@ -519,6 +526,105 @@ abstract class DuplexConnection implements DatabaseListener {
} }
} }
// This task runs on a database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
try {
RetentionAck a = db.generateRetentionAck(contactId);
if(a != null) writerTasks.add(new WriteRetentionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This tasks runs on the writer thread
private class WriteRetentionAck implements Runnable {
private final RetentionAck ack;
private WriteRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
assert writer != null;
try {
writer.writeRetentionAck(ack);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
try {
RetentionUpdate u = db.generateRetentionUpdate(contactId);
if(u != null) writerTasks.add(new WriteRetentionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private WriteRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
assert writer != null;
try {
writer.writeRetentionUpdate(update);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(a != null) writerTasks.add(new WriteSubscriptionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This tasks runs on the writer thread
private class WriteSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private WriteSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
assert writer != null;
try {
writer.writeSubscriptionAck(ack);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread // This task runs on a database thread
private class GenerateSubscriptionUpdate implements Runnable { private class GenerateSubscriptionUpdate implements Runnable {
@@ -552,6 +658,40 @@ abstract class DuplexConnection implements DatabaseListener {
} }
} }
// This task runs on a database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if(acks != null) writerTasks.add(new WriteTransportAcks(acks));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This tasks runs on the writer thread
private class WriteTransportAcks implements Runnable {
private final Collection<TransportAck> acks;
private WriteTransportAcks(Collection<TransportAck> acks) {
this.acks = acks;
}
public void run() {
assert writer != null;
try {
for(TransportAck a : acks) writer.writeTransportAck(a);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on a database thread // This task runs on a database thread
private class GenerateTransportUpdate implements Runnable { private class GenerateTransportUpdate implements Runnable {

View File

@@ -19,7 +19,11 @@ import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageVerifier; import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.ProtocolReader; import net.sf.briar.api.protocol.ProtocolReader;
import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.RetentionAck;
import net.sf.briar.api.protocol.RetentionUpdate;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportId; import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedMessage; import net.sf.briar.api.protocol.UnverifiedMessage;
@@ -30,7 +34,6 @@ import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionRegistry; import net.sf.briar.api.transport.ConnectionRegistry;
import net.sf.briar.util.ByteUtils; import net.sf.briar.util.ByteUtils;
// FIXME: Read subscription and transport acks
class IncomingSimplexConnection { class IncomingSimplexConnection {
private static final Logger LOG = private static final Logger LOG =
@@ -82,9 +85,21 @@ class IncomingSimplexConnection {
} else if(reader.hasMessage()) { } else if(reader.hasMessage()) {
UnverifiedMessage m = reader.readMessage(); UnverifiedMessage m = reader.readMessage();
verificationExecutor.execute(new VerifyMessage(m)); verificationExecutor.execute(new VerifyMessage(m));
} else if(reader.hasRetentionAck()) {
RetentionAck a = reader.readRetentionAck();
dbExecutor.execute(new ReceiveRetentionAck(a));
} else if(reader.hasRetentionUpdate()) {
RetentionUpdate u = reader.readRetentionUpdate();
dbExecutor.execute(new ReceiveRetentionUpdate(u));
} else if(reader.hasSubscriptionAck()) {
SubscriptionAck a = reader.readSubscriptionAck();
dbExecutor.execute(new ReceiveSubscriptionAck(a));
} else if(reader.hasSubscriptionUpdate()) { } else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = reader.readSubscriptionUpdate(); SubscriptionUpdate u = reader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(u)); dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if(reader.hasTransportAck()) {
TransportAck a = reader.readTransportAck();
dbExecutor.execute(new ReceiveTransportAck(a));
} else if(reader.hasTransportUpdate()) { } else if(reader.hasTransportUpdate()) {
TransportUpdate u = reader.readTransportUpdate(); TransportUpdate u = reader.readTransportUpdate();
dbExecutor.execute(new ReceiveTransportUpdate(u)); dbExecutor.execute(new ReceiveTransportUpdate(u));
@@ -162,6 +177,57 @@ class IncomingSimplexConnection {
} }
} }
private class ReceiveRetentionAck implements Runnable {
private final RetentionAck ack;
private ReceiveRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveRetentionAck(contactId, ack);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private ReceiveRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveRetentionUpdate(contactId, update);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private ReceiveSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveSubscriptionAck(contactId, ack);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveSubscriptionUpdate implements Runnable { private class ReceiveSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update; private final SubscriptionUpdate update;
@@ -179,6 +245,23 @@ class IncomingSimplexConnection {
} }
} }
private class ReceiveTransportAck implements Runnable {
private final TransportAck ack;
private ReceiveTransportAck(TransportAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveTransportAck(contactId, ack);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveTransportUpdate implements Runnable { private class ReceiveTransportUpdate implements Runnable {
private final TransportUpdate update; private final TransportUpdate update;

View File

@@ -16,7 +16,11 @@ import net.sf.briar.api.plugins.simplex.SimplexTransportWriter;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.ProtocolWriter; import net.sf.briar.api.protocol.ProtocolWriter;
import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.RetentionAck;
import net.sf.briar.api.protocol.RetentionUpdate;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportId; import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionContext;
@@ -25,7 +29,6 @@ import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory; import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.util.ByteUtils; import net.sf.briar.util.ByteUtils;
// FIXME: Write subscription and transport acks
class OutgoingSimplexConnection { class OutgoingSimplexConnection {
private static final Logger LOG = private static final Logger LOG =
@@ -55,6 +58,8 @@ class OutgoingSimplexConnection {
transportId = ctx.getTransportId(); transportId = ctx.getTransportId();
} }
// FIXME: Write each packet to a buffer, check for capacity before writing
// it to the connection (except raw messages, which are already serialised)
void write() { void write() {
connRegistry.registerConnection(contactId, transportId); connRegistry.registerConnection(contactId, transportId);
try { try {
@@ -62,22 +67,33 @@ class OutgoingSimplexConnection {
transport.getOutputStream(), transport.getCapacity(), transport.getOutputStream(), transport.getCapacity(),
ctx, false, true); ctx, false, true);
OutputStream out = conn.getOutputStream(); OutputStream out = conn.getOutputStream();
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH)
throw new EOFException();
ProtocolWriter writer = protoFactory.createProtocolWriter(out, ProtocolWriter writer = protoFactory.createProtocolWriter(out,
transport.shouldFlush()); transport.shouldFlush());
// There should be enough space for a packet // Send the initial packets: updates and acks
long capacity = conn.getRemainingCapacity(); Collection<TransportAck> transportAcks =
if(capacity < MAX_PACKET_LENGTH) throw new EOFException(); db.generateTransportAcks(contactId);
// Write transport updates. FIXME: Check for space if(transportAcks != null) {
Collection<TransportUpdate> updates = for(TransportAck ta : transportAcks)
db.generateTransportUpdates(contactId); writer.writeTransportAck(ta);
if(updates != null) {
for(TransportUpdate u : updates) writer.writeTransportUpdate(u);
} }
// Write a subscription update. FIXME: Check for space Collection<TransportUpdate> transportUpdates =
SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId); db.generateTransportUpdates(contactId);
if(u != null) writer.writeSubscriptionUpdate(u); if(transportUpdates != null) {
for(TransportUpdate tu : transportUpdates)
writer.writeTransportUpdate(tu);
}
SubscriptionAck sa = db.generateSubscriptionAck(contactId);
if(sa != null) writer.writeSubscriptionAck(sa);
SubscriptionUpdate su = db.generateSubscriptionUpdate(contactId);
if(su != null) writer.writeSubscriptionUpdate(su);
RetentionAck ra = db.generateRetentionAck(contactId);
if(ra != null) writer.writeRetentionAck(ra);
RetentionUpdate ru = db.generateRetentionUpdate(contactId);
if(ru != null) writer.writeRetentionUpdate(ru);
// Write acks until you can't write acks no more // Write acks until you can't write acks no more
capacity = conn.getRemainingCapacity(); long capacity = conn.getRemainingCapacity();
int maxMessages = writer.getMaxMessagesForAck(capacity); int maxMessages = writer.getMaxMessagesForAck(capacity);
Ack a = db.generateAck(contactId, maxMessages); Ack a = db.generateAck(contactId, maxMessages);
while(a != null) { while(a != null) {

View File

@@ -41,7 +41,6 @@ import org.jmock.Mockery;
import org.junit.Test; import org.junit.Test;
// FIXME: Replace allowing() with oneOf() to tighten up tests // FIXME: Replace allowing() with oneOf() to tighten up tests
public abstract class DatabaseComponentTest extends BriarTestCase { public abstract class DatabaseComponentTest extends BriarTestCase {
protected final Object txn = new Object(); protected final Object txn = new Object();

View File

@@ -108,12 +108,24 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase {
OutgoingSimplexConnection connection = new OutgoingSimplexConnection(db, OutgoingSimplexConnection connection = new OutgoingSimplexConnection(db,
connRegistry, connFactory, protoFactory, ctx, transport); connRegistry, connFactory, protoFactory, ctx, transport);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// No transports to send // No transport acks to send
oneOf(db).generateTransportAcks(contactId);
will(returnValue(null));
// No transport updates to send
oneOf(db).generateTransportUpdates(contactId); oneOf(db).generateTransportUpdates(contactId);
will(returnValue(null)); will(returnValue(null));
// No subscriptions to send // No subscription ack to send
oneOf(db).generateSubscriptionAck(contactId);
will(returnValue(null));
// No subscription update to send
oneOf(db).generateSubscriptionUpdate(contactId); oneOf(db).generateSubscriptionUpdate(contactId);
will(returnValue(null)); will(returnValue(null));
// No retention ack to send
oneOf(db).generateRetentionAck(contactId);
will(returnValue(null));
// No retention update to send
oneOf(db).generateRetentionUpdate(contactId);
will(returnValue(null));
// No acks to send // No acks to send
oneOf(db).generateAck(with(contactId), with(any(int.class))); oneOf(db).generateAck(with(contactId), with(any(int.class)));
will(returnValue(null)); will(returnValue(null));
@@ -141,12 +153,24 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase {
connRegistry, connFactory, protoFactory, ctx, transport); connRegistry, connFactory, protoFactory, ctx, transport);
final byte[] raw = new byte[1234]; final byte[] raw = new byte[1234];
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// No transports to send // No transport acks to send
oneOf(db).generateTransportAcks(contactId);
will(returnValue(null));
// No transport updates to send
oneOf(db).generateTransportUpdates(contactId); oneOf(db).generateTransportUpdates(contactId);
will(returnValue(null)); will(returnValue(null));
// No subscriptions to send // No subscription ack to send
oneOf(db).generateSubscriptionAck(contactId);
will(returnValue(null));
// No subscription update to send
oneOf(db).generateSubscriptionUpdate(contactId); oneOf(db).generateSubscriptionUpdate(contactId);
will(returnValue(null)); will(returnValue(null));
// No retention ack to send
oneOf(db).generateRetentionAck(contactId);
will(returnValue(null));
// No retention update to send
oneOf(db).generateRetentionUpdate(contactId);
will(returnValue(null));
// One ack to send // One ack to send
oneOf(db).generateAck(with(contactId), with(any(int.class))); oneOf(db).generateAck(with(contactId), with(any(int.class)));
will(returnValue(new Ack(Arrays.asList(messageId)))); will(returnValue(new Ack(Arrays.asList(messageId))));