Converted {Incoming,Outgoing}BatchConnection into Runnables.

Also changed the dispose() method of readers/writers/connections to
swallow any exceptions that occur, since the caller can't do anything
except log them.
This commit is contained in:
akwizgran
2011-10-14 16:14:29 +01:00
parent 55182528cf
commit d48c7b6900
14 changed files with 290 additions and 92 deletions

View File

@@ -0,0 +1,47 @@
package net.sf.briar.transport.batch;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
import net.sf.briar.api.transport.BatchTransportReader;
import net.sf.briar.api.transport.BatchTransportWriter;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.api.transport.batch.BatchConnectionFactory;
import com.google.inject.Inject;
class BatchConnectionFactoryImpl implements BatchConnectionFactory {
private final ConnectionReaderFactory connReaderFactory;
private final ConnectionWriterFactory connWriterFactory;
private final DatabaseComponent db;
private final ProtocolReaderFactory protoReaderFactory;
private final ProtocolWriterFactory protoWriterFactory;
@Inject
BatchConnectionFactoryImpl(ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory) {
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
this.db = db;
this.protoReaderFactory = protoReaderFactory;
this.protoWriterFactory = protoWriterFactory;
}
public Runnable createOutgoingConnection(TransportId t, ContactId c,
BatchTransportWriter w) {
return new OutgoingBatchConnection(connWriterFactory, db,
protoWriterFactory, t, c, w);
}
public Runnable createIncomingConnection(ContactId c,
BatchTransportReader r, byte[] encryptedIv) {
return new IncomingBatchConnection(connReaderFactory, db,
protoReaderFactory, c, r, encryptedIv);
}
}

View File

@@ -1,7 +1,8 @@
package net.sf.briar.transport.batch;
import java.io.IOException;
import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.FormatException;
@@ -13,45 +14,67 @@ import net.sf.briar.api.protocol.ProtocolReader;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.transport.BatchTransportReader;
import net.sf.briar.api.transport.ConnectionReader;
import net.sf.briar.api.transport.ConnectionReaderFactory;
class IncomingBatchConnection {
class IncomingBatchConnection implements Runnable {
private final ConnectionReader conn;
private static final Logger LOG =
Logger.getLogger(IncomingBatchConnection.class.getName());
private final ConnectionReaderFactory connFactory;
private final DatabaseComponent db;
private final ProtocolReaderFactory protoFactory;
private final ContactId contactId;
private final BatchTransportReader reader;
private final byte[] encryptedIv;
IncomingBatchConnection(ConnectionReader conn, DatabaseComponent db,
ProtocolReaderFactory protoFactory, ContactId contactId) {
this.conn = conn;
IncomingBatchConnection(ConnectionReaderFactory connFactory,
DatabaseComponent db, ProtocolReaderFactory protoFactory,
ContactId contactId, BatchTransportReader reader,
byte[] encryptedIv) {
this.connFactory = connFactory;
this.db = db;
this.protoFactory = protoFactory;
this.contactId = contactId;
this.reader = reader;
this.encryptedIv = encryptedIv;
}
void read() throws DbException, IOException {
InputStream in = conn.getInputStream();
ProtocolReader proto = protoFactory.createProtocolReader(in);
// Read packets until EOF
while(!proto.eof()) {
if(proto.hasAck()) {
Ack a = proto.readAck();
db.receiveAck(contactId, a);
} else if(proto.hasBatch()) {
Batch b = proto.readBatch();
db.receiveBatch(contactId, b);
} else if(proto.hasSubscriptionUpdate()) {
SubscriptionUpdate s = proto.readSubscriptionUpdate();
db.receiveSubscriptionUpdate(contactId, s);
} else if(proto.hasTransportUpdate()) {
TransportUpdate t = proto.readTransportUpdate();
db.receiveTransportUpdate(contactId, t);
} else {
throw new FormatException();
public void run() {
try {
byte[] secret = db.getSharedSecret(contactId);
ConnectionReader conn = connFactory.createConnectionReader(
reader.getInputStream(), encryptedIv, secret);
ProtocolReader proto = protoFactory.createProtocolReader(
conn.getInputStream());
// Read packets until EOF
while(!proto.eof()) {
if(proto.hasAck()) {
Ack a = proto.readAck();
db.receiveAck(contactId, a);
} else if(proto.hasBatch()) {
Batch b = proto.readBatch();
db.receiveBatch(contactId, b);
} else if(proto.hasSubscriptionUpdate()) {
SubscriptionUpdate s = proto.readSubscriptionUpdate();
db.receiveSubscriptionUpdate(contactId, s);
} else if(proto.hasTransportUpdate()) {
TransportUpdate t = proto.readTransportUpdate();
db.receiveTransportUpdate(contactId, t);
} else {
throw new FormatException();
}
}
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
reader.dispose(false);
} catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
reader.dispose(false);
}
// Close the input stream
in.close();
// Success
reader.dispose(true);
}
}

View File

@@ -4,8 +4,11 @@ import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import java.io.IOException;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.protocol.writers.AckWriter;
@@ -13,52 +16,77 @@ import net.sf.briar.api.protocol.writers.BatchWriter;
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
import net.sf.briar.api.protocol.writers.SubscriptionWriter;
import net.sf.briar.api.protocol.writers.TransportWriter;
import net.sf.briar.api.transport.BatchTransportWriter;
import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory;
class OutgoingBatchConnection {
class OutgoingBatchConnection implements Runnable {
private final ConnectionWriter conn;
private static final Logger LOG =
Logger.getLogger(OutgoingBatchConnection.class.getName());
private final ConnectionWriterFactory connFactory;
private final DatabaseComponent db;
private final ProtocolWriterFactory protoFactory;
private final TransportId transportId;
private final ContactId contactId;
private final BatchTransportWriter writer;
OutgoingBatchConnection(ConnectionWriter conn, DatabaseComponent db,
ProtocolWriterFactory protoFactory, ContactId contactId) {
this.conn = conn;
OutgoingBatchConnection(ConnectionWriterFactory connFactory,
DatabaseComponent db, ProtocolWriterFactory protoFactory,
TransportId transportId, ContactId contactId,
BatchTransportWriter writer) {
this.connFactory = connFactory;
this.db = db;
this.protoFactory = protoFactory;
this.transportId = transportId;
this.contactId = contactId;
this.writer = writer;
}
void write() throws DbException, IOException {
OutputStream out = conn.getOutputStream();
// There should be enough space for a packet
long capacity = conn.getRemainingCapacity();
if(capacity < MAX_PACKET_LENGTH) throw new IOException();
// Write a transport update
TransportWriter t = protoFactory.createTransportWriter(out);
db.generateTransportUpdate(contactId, t);
// If there's space, write a subscription update
capacity = conn.getRemainingCapacity();
if(capacity >= MAX_PACKET_LENGTH) {
SubscriptionWriter s = protoFactory.createSubscriptionWriter(out);
db.generateSubscriptionUpdate(contactId, s);
public void run() {
try {
byte[] secret = db.getSharedSecret(contactId);
long connection = db.getConnectionNumber(contactId, transportId);
ConnectionWriter conn = connFactory.createConnectionWriter(
writer.getOutputStream(), writer.getCapacity(), true,
transportId, connection, secret);
OutputStream out = conn.getOutputStream();
// There should be enough space for a packet
long capacity = conn.getRemainingCapacity();
if(capacity < MAX_PACKET_LENGTH) throw new IOException();
// Write a transport update
TransportWriter t = protoFactory.createTransportWriter(out);
db.generateTransportUpdate(contactId, t);
// If there's space, write a subscription update
capacity = conn.getRemainingCapacity();
if(capacity >= MAX_PACKET_LENGTH) {
SubscriptionWriter s =
protoFactory.createSubscriptionWriter(out);
db.generateSubscriptionUpdate(contactId, s);
}
// Write acks until you can't write acks no more
AckWriter a = protoFactory.createAckWriter(out);
do {
capacity = conn.getRemainingCapacity();
int max = (int) Math.min(MAX_PACKET_LENGTH, capacity);
a.setMaxPacketLength(max);
} while(db.generateAck(contactId, a));
// Write batches until you can't write batches no more
BatchWriter b = protoFactory.createBatchWriter(out);
do {
capacity = conn.getRemainingCapacity();
int max = (int) Math.min(MAX_PACKET_LENGTH, capacity);
b.setMaxPacketLength(max);
} while(db.generateBatch(contactId, b));
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
writer.dispose(false);
} catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
writer.dispose(false);
}
// Write acks until you can't write acks no more
AckWriter a = protoFactory.createAckWriter(out);
do {
capacity = conn.getRemainingCapacity();
int max = (int) Math.min(MAX_PACKET_LENGTH, capacity);
a.setMaxPacketLength(max);
} while(db.generateAck(contactId, a));
// Write batches until you can't write batches no more
BatchWriter b = protoFactory.createBatchWriter(out);
do {
capacity = conn.getRemainingCapacity();
int max = (int) Math.min(MAX_PACKET_LENGTH, capacity);
b.setMaxPacketLength(max);
} while(db.generateBatch(contactId, b));
// Close the output stream
out.close();
// Success
writer.dispose(true);
}
}

View File

@@ -0,0 +1,15 @@
package net.sf.briar.transport.batch;
import net.sf.briar.api.transport.batch.BatchConnectionFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
public class TransportBatchModule extends AbstractModule {
@Override
protected void configure() {
bind(BatchConnectionFactory.class).to(
BatchConnectionFactoryImpl.class).in(Singleton.class);
}
}