Implemented incoming and outgoing batch connections (untested).

This commit is contained in:
akwizgran
2011-09-22 16:26:06 +01:00
parent b65d6631f1
commit 09971c8460
8 changed files with 244 additions and 2 deletions

View File

@@ -4,6 +4,8 @@ import java.io.IOException;
public interface ProtocolReader {
boolean eof() throws IOException;
boolean hasAck() throws IOException;
Ack readAck() throws IOException;

View File

@@ -17,5 +17,5 @@ public interface BatchTransportReader {
* be called even if the reader is not used, or if an exception is thrown
* while using the reader.
*/
void close() throws IOException;
void dispose() throws IOException;
}

View File

@@ -20,5 +20,5 @@ public interface BatchTransportWriter {
* be called even if the writer is not used, or if an exception is thrown
* while using the writer.
*/
void close() throws IOException;
void dispose() throws IOException;
}

View File

@@ -34,6 +34,10 @@ class ProtocolReaderImpl implements ProtocolReader {
reader.addObjectReader(Types.TRANSPORT_UPDATE, transportReader);
}
public boolean eof() throws IOException {
return reader.eof();
}
public boolean hasAck() throws IOException {
return reader.hasUserDefined(Types.ACK);
}

View File

@@ -0,0 +1,70 @@
package net.sf.briar.transport.batch;
import java.io.IOException;
import java.io.InputStream;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.ProtocolReader;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.transport.ConnectionReader;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.batch.BatchTransportReader;
class IncomingBatchConnection {
private final BatchTransportReader trans;
private final ConnectionReaderFactory connFactory;
private final DatabaseComponent db;
private final ProtocolReaderFactory protoFactory;
private final int transportId;
private final long connection;
private final ContactId contactId;
IncomingBatchConnection(BatchTransportReader trans,
ConnectionReaderFactory connFactory, DatabaseComponent db,
ProtocolReaderFactory protoFactory, int transportId,
long connection, ContactId contactId) {
this.trans = trans;
this.connFactory = connFactory;
this.db = db;
this.protoFactory = protoFactory;
this.transportId = transportId;
this.connection = connection;
this.contactId = contactId;
}
void read() throws DbException, IOException {
byte[] secret = db.getSharedSecret(contactId);
ConnectionReader conn = connFactory.createConnectionReader(
trans.getInputStream(), false, transportId, connection, secret);
InputStream in = conn.getInputStream();
ProtocolReader proto = protoFactory.createProtocolReader(in);
// Read packets until EOF
while(!proto.eof()) {
if(proto.hasAck()) {
Ack a = proto.readAck();
db.receiveAck(contactId, a);
} else if(proto.hasBatch()) {
Batch b = proto.readBatch();
db.receiveBatch(contactId, b);
} else if(proto.hasSubscriptionUpdate()) {
SubscriptionUpdate s = proto.readSubscriptionUpdate();
db.receiveSubscriptionUpdate(contactId, s);
} else if(proto.hasTransportUpdate()) {
TransportUpdate t = proto.readTransportUpdate();
db.receiveTransportUpdate(contactId, t);
} else {
throw new FormatException();
}
}
// Close the input stream
in.close();
}
}

View File

@@ -0,0 +1,77 @@
package net.sf.briar.transport.batch;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import java.io.IOException;
import java.io.OutputStream;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.protocol.writers.AckWriter;
import net.sf.briar.api.protocol.writers.BatchWriter;
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
import net.sf.briar.api.protocol.writers.SubscriptionWriter;
import net.sf.briar.api.protocol.writers.TransportWriter;
import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.api.transport.batch.BatchTransportWriter;
class OutgoingBatchConnection {
private final BatchTransportWriter trans;
private final ConnectionWriterFactory connFactory;
private final DatabaseComponent db;
private final ProtocolWriterFactory protoFactory;
private final int transportId;
private final long connection;
private final ContactId contactId;
OutgoingBatchConnection(BatchTransportWriter trans,
ConnectionWriterFactory connFactory, DatabaseComponent db,
ProtocolWriterFactory protoFactory, int transportId,
long connection, ContactId contactId) {
this.trans = trans;
this.connFactory = connFactory;
this.db = db;
this.protoFactory = protoFactory;
this.transportId = transportId;
this.connection = connection;
this.contactId = contactId;
}
void write() throws DbException, IOException {
byte[] secret = db.getSharedSecret(contactId);
ConnectionWriter conn = connFactory.createConnectionWriter(
trans.getOutputStream(), true, transportId, connection, secret);
OutputStream out = conn.getOutputStream();
// There should be enough space for a packet
long capacity = conn.getCapacity(trans.getCapacity());
if(capacity < MAX_PACKET_LENGTH) throw new IOException();
// Write a transport update
TransportWriter t = protoFactory.createTransportWriter(out);
db.generateTransportUpdate(contactId, t);
// If there's space, write a subscription update
capacity = conn.getCapacity(trans.getCapacity());
if(capacity >= MAX_PACKET_LENGTH) {
SubscriptionWriter s = protoFactory.createSubscriptionWriter(out);
db.generateSubscriptionUpdate(contactId, s);
}
// Write acks until you can't write acks no more
AckWriter a = protoFactory.createAckWriter(out);
do {
capacity = conn.getCapacity(trans.getCapacity());
int max = (int) Math.min(MAX_PACKET_LENGTH, capacity);
a.setMaxPacketLength(max);
} while(db.generateAck(contactId, a));
// Write batches until you can't write batches no more
BatchWriter b = protoFactory.createBatchWriter(out);
do {
capacity = conn.getCapacity(trans.getCapacity());
int max = (int) Math.min(MAX_PACKET_LENGTH, capacity);
b.setMaxPacketLength(max);
} while(db.generateBatch(contactId, b));
// Close the output stream
out.close();
}
}

View File

@@ -0,0 +1,39 @@
package net.sf.briar.transport.batch;
import java.io.ByteArrayInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import net.sf.briar.api.transport.batch.BatchTransportReader;
class ByteArrayBatchTransportReader extends FilterInputStream
implements BatchTransportReader {
ByteArrayBatchTransportReader(ByteArrayInputStream in) {
super(in);
}
public InputStream getInputStream() throws IOException {
return this;
}
public void dispose() throws IOException {
// Nothing to do
}
@Override
public int read() throws IOException {
return in.read();
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
}

View File

@@ -0,0 +1,50 @@
package net.sf.briar.transport.batch;
import java.io.ByteArrayOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import net.sf.briar.api.transport.batch.BatchTransportWriter;
class ByteArrayBatchTransportWriter extends FilterOutputStream
implements BatchTransportWriter {
private int capacity;
ByteArrayBatchTransportWriter(ByteArrayOutputStream out, int capacity) {
super(out);
this.capacity = capacity;
}
public long getCapacity() throws IOException {
return capacity;
}
public OutputStream getOutputStream() throws IOException {
return this;
}
public void dispose() throws IOException {
// Nothing to do
}
@Override
public void write(int b) throws IOException {
if(capacity < 1) throw new IllegalArgumentException();
out.write(b);
capacity--;
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if(len > capacity) throw new IllegalArgumentException();
out.write(b, off, len);
capacity -= len;
}
}