StreamConnectionFactory and ConnectionDispatcherImpl (untested).

This commit is contained in:
akwizgran
2011-10-14 21:07:49 +01:00
parent 718bd8c540
commit 0d11553134
14 changed files with 441 additions and 36 deletions

View File

@@ -2,8 +2,13 @@ package net.sf.briar.api.transport;
import java.io.InputStream;
import net.sf.briar.api.TransportId;
public interface ConnectionReaderFactory {
ConnectionReader createConnectionReader(InputStream in, byte[] encryptedIv,
byte[] secret);
ConnectionReader createConnectionReader(InputStream in, boolean initiator,
TransportId t, long connection, byte[] secret);
}

View File

@@ -6,7 +6,9 @@ import net.sf.briar.api.TransportId;
public interface ConnectionWriterFactory {
ConnectionWriter createConnectionWriter(OutputStream out,
long capacity, boolean initiator, TransportId t, long connection,
byte[] secret);
ConnectionWriter createConnectionWriter(OutputStream out, long capacity,
boolean initiator, TransportId t, long connection, byte[] secret);
ConnectionWriter createConnectionWriter(OutputStream out, long capacity,
byte[] encryptedIv, byte[] secret);
}

View File

@@ -7,9 +7,9 @@ import net.sf.briar.api.transport.BatchTransportWriter;
public interface BatchConnectionFactory {
Runnable createOutgoingConnection(TransportId t, ContactId c,
BatchTransportWriter w);
Runnable createIncomingConnection(ContactId c, BatchTransportReader r,
byte[] encryptedIv);
Runnable createOutgoingConnection(TransportId t, ContactId c,
BatchTransportWriter w);
}

View File

@@ -0,0 +1,14 @@
package net.sf.briar.api.transport.stream;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.transport.StreamTransportConnection;
public interface StreamConnectionFactory {
Runnable[] createIncomingConnection(ContactId c,
StreamTransportConnection s, byte[] encryptedIv);
Runnable[] createOutgoingConnection(TransportId t, ContactId c,
StreamTransportConnection s);
}

View File

@@ -48,6 +48,16 @@ implements ConnectionDecrypter {
buf = new byte[IV_LENGTH];
}
ConnectionDecrypterImpl(InputStream in, byte[] iv, Cipher frameCipher,
SecretKey frameKey) {
super(in);
if(iv.length != IV_LENGTH) throw new IllegalArgumentException();
this.iv = iv;
this.frameCipher = frameCipher;
this.frameKey = frameKey;
buf = new byte[IV_LENGTH];
}
public InputStream getInputStream() {
return this;
}

View File

@@ -0,0 +1,142 @@
package net.sf.briar.transport;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.transport.BatchTransportReader;
import net.sf.briar.api.transport.BatchTransportWriter;
import net.sf.briar.api.transport.ConnectionDispatcher;
import net.sf.briar.api.transport.ConnectionRecogniser;
import net.sf.briar.api.transport.ConnectionRecogniserFactory;
import net.sf.briar.api.transport.StreamTransportConnection;
import net.sf.briar.api.transport.TransportConstants;
import net.sf.briar.api.transport.batch.BatchConnectionFactory;
import net.sf.briar.api.transport.stream.StreamConnectionFactory;
public class ConnectionDispatcherImpl implements ConnectionDispatcher {
private static final Logger LOG =
Logger.getLogger(ConnectionDispatcherImpl.class.getName());
private final Executor executor;
private final ConnectionRecogniserFactory recFactory;
private final BatchConnectionFactory batchConnFactory;
private final StreamConnectionFactory streamConnFactory;
private final Map<TransportId, ConnectionRecogniser> recognisers;
ConnectionDispatcherImpl(Executor executor,
ConnectionRecogniserFactory recFactory,
BatchConnectionFactory batchConnFactory,
StreamConnectionFactory streamConnFactory) {
this.executor = executor;
this.recFactory = recFactory;
this.batchConnFactory = batchConnFactory;
this.streamConnFactory = streamConnFactory;
recognisers = new HashMap<TransportId, ConnectionRecogniser>();
}
public void dispatchReader(TransportId t, BatchTransportReader r) {
// Read the encrypted IV
byte[] encryptedIv;
try {
encryptedIv = readIv(r.getInputStream());
} catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
r.dispose(false);
return;
}
// Get the contact ID, or null if the IV wasn't expected
ContactId c;
try {
ConnectionRecogniser rec = getRecogniser(t);
c = rec.acceptConnection(encryptedIv);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
r.dispose(false);
return;
}
if(c == null) {
r.dispose(false);
return;
}
// Pass the connection to the executor and return
executor.execute(batchConnFactory.createIncomingConnection(c, r,
encryptedIv));
}
private byte[] readIv(InputStream in) throws IOException {
byte[] b = new byte[TransportConstants.IV_LENGTH];
int offset = 0;
while(offset < b.length) {
int read = in.read(b, offset, b.length - offset);
if(read == -1) throw new IOException();
offset += read;
}
return b;
}
private ConnectionRecogniser getRecogniser(TransportId t) {
synchronized(recognisers) {
ConnectionRecogniser rec = recognisers.get(t);
if(rec == null) {
rec = recFactory.createConnectionRecogniser(t);
recognisers.put(t, rec);
}
return rec;
}
}
public void dispatchWriter(TransportId t, ContactId c,
BatchTransportWriter w) {
executor.execute(batchConnFactory.createOutgoingConnection(t, c, w));
}
public void dispatchIncomingConnection(TransportId t,
StreamTransportConnection s) {
// Read the encrypted IV
byte[] encryptedIv;
try {
encryptedIv = readIv(s.getInputStream());
} catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
s.dispose(false);
return;
}
// Get the contact ID, or null if the IV wasn't expected
ContactId c;
try {
ConnectionRecogniser rec = getRecogniser(t);
c = rec.acceptConnection(encryptedIv);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
s.dispose(false);
return;
}
if(c == null) {
s.dispose(false);
return;
}
// Pass the connection to the executor and return
Runnable[] r = streamConnFactory.createIncomingConnection(c, s,
encryptedIv);
assert r.length == 2;
executor.execute(r[0]); // Write
executor.execute(r[1]); // Read
}
public void dispatchOutgoingConnection(TransportId t, ContactId c,
StreamTransportConnection s) {
Runnable[] r = streamConnFactory.createOutgoingConnection(t, c, s);
assert r.length == 2;
executor.execute(r[0]); // Write
executor.execute(r[1]); // Read
}
}

View File

@@ -6,6 +6,7 @@ import javax.crypto.Cipher;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.crypto.CryptoComponent;
import net.sf.briar.api.transport.ConnectionReader;
import net.sf.briar.api.transport.ConnectionReaderFactory;
@@ -35,4 +36,18 @@ class ConnectionReaderFactoryImpl implements ConnectionReaderFactory {
SecretKey macKey = crypto.deriveIncomingMacKey(secret);
return new ConnectionReaderImpl(decrypter, mac, macKey);
}
public ConnectionReader createConnectionReader(InputStream in,
boolean initiator, TransportId t, long connection, byte[] secret) {
byte[] iv = IvEncoder.encodeIv(initiator, t, connection);
// Create the decrypter
Cipher frameCipher = crypto.getFrameCipher();
SecretKey frameKey = crypto.deriveIncomingFrameKey(secret);
ConnectionDecrypter decrypter = new ConnectionDecrypterImpl(in, iv,
frameCipher, frameKey);
// Create the reader
Mac mac = crypto.getMac();
SecretKey macKey = crypto.deriveIncomingMacKey(secret);
return new ConnectionReaderImpl(decrypter, mac, macKey);
}
}

View File

@@ -1,8 +1,11 @@
package net.sf.briar.transport;
import java.io.OutputStream;
import java.security.InvalidKeyException;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
@@ -38,4 +41,27 @@ class ConnectionWriterFactoryImpl implements ConnectionWriterFactory {
SecretKey macKey = crypto.deriveOutgoingMacKey(secret);
return new ConnectionWriterImpl(encrypter, mac, macKey);
}
public ConnectionWriter createConnectionWriter(OutputStream out,
long capacity, byte[] encryptedIv, byte[] secret) {
// Decrypt the IV
Cipher ivCipher = crypto.getIvCipher();
SecretKey ivKey = crypto.deriveIncomingIvKey(secret);
byte[] iv;
try {
ivCipher.init(Cipher.DECRYPT_MODE, ivKey);
iv = ivCipher.doFinal(encryptedIv);
} catch(BadPaddingException badCipher) {
throw new RuntimeException(badCipher);
} catch(IllegalBlockSizeException badCipher) {
throw new RuntimeException(badCipher);
} catch(InvalidKeyException badKey) {
throw new RuntimeException(badKey);
}
boolean initiator = IvEncoder.getInitiatorFlag(iv);
TransportId t = new TransportId(IvEncoder.getTransportId(iv));
long connection = IvEncoder.getConnectionNumber(iv);
return createConnectionWriter(out, capacity, initiator, t, connection,
secret);
}
}

View File

@@ -23,4 +23,19 @@ class IvEncoder {
// Encode the frame number as an unsigned 32-bit integer
ByteUtils.writeUint32(frame, iv, 10);
}
static boolean getInitiatorFlag(byte[] iv) {
if(iv.length != IV_LENGTH) throw new IllegalArgumentException();
return (iv[3] & 1) == 1;
}
static int getTransportId(byte[] iv) {
if(iv.length != IV_LENGTH) throw new IllegalArgumentException();
return ByteUtils.readUint16(iv, 4);
}
static long getConnectionNumber(byte[] iv) {
if(iv.length != IV_LENGTH) throw new IllegalArgumentException();
return ByteUtils.readUint32(iv, 6);
}
}

View File

@@ -33,15 +33,15 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
this.protoWriterFactory = protoWriterFactory;
}
public Runnable createOutgoingConnection(TransportId t, ContactId c,
BatchTransportWriter w) {
return new OutgoingBatchConnection(connWriterFactory, db,
protoWriterFactory, t, c, w);
}
public Runnable createIncomingConnection(ContactId c,
BatchTransportReader r, byte[] encryptedIv) {
return new IncomingBatchConnection(connReaderFactory, db,
protoReaderFactory, c, r, encryptedIv);
}
public Runnable createOutgoingConnection(TransportId t, ContactId c,
BatchTransportWriter w) {
return new OutgoingBatchConnection(connWriterFactory, db,
protoWriterFactory, t, c, w);
}
}

View File

@@ -0,0 +1,46 @@
package net.sf.briar.transport.stream;
import java.io.IOException;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
import net.sf.briar.api.transport.ConnectionReader;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.api.transport.StreamTransportConnection;
public class IncomingStreamConnection extends StreamConnection {
private final byte[] encryptedIv;
IncomingStreamConnection(ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
StreamTransportConnection connection, byte[] encryptedIv) {
super(connReaderFactory, connWriterFactory, db, protoReaderFactory,
protoWriterFactory, contactId, connection);
this.encryptedIv = encryptedIv;
}
@Override
protected ConnectionReader createConnectionReader() throws DbException,
IOException {
byte[] secret = db.getSharedSecret(contactId);
return connReaderFactory.createConnectionReader(
connection.getInputStream(), encryptedIv, secret);
}
@Override
protected ConnectionWriter createConnectionWriter() throws DbException,
IOException {
byte[] secret = db.getSharedSecret(contactId);
return connWriterFactory.createConnectionWriter(
connection.getOutputStream(), Long.MAX_VALUE, encryptedIv,
secret);
}
}

View File

@@ -0,0 +1,54 @@
package net.sf.briar.transport.stream;
import java.io.IOException;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
import net.sf.briar.api.transport.ConnectionReader;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.api.transport.StreamTransportConnection;
public class OutgoingStreamConnection extends StreamConnection {
private final TransportId transportId;
private long connectionNum = -1L;
OutgoingStreamConnection(ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
StreamTransportConnection connection, TransportId transportId) {
super(connReaderFactory, connWriterFactory, db, protoReaderFactory,
protoWriterFactory, contactId, connection);
this.transportId = transportId;
}
@Override
protected ConnectionReader createConnectionReader() throws DbException,
IOException {
if(connectionNum == -1L)
connectionNum = db.getConnectionNumber(contactId, transportId);
byte[] secret = db.getSharedSecret(contactId);
return connReaderFactory.createConnectionReader(
connection.getInputStream(), false, transportId, connectionNum,
secret);
}
@Override
protected ConnectionWriter createConnectionWriter() throws DbException,
IOException {
if(connectionNum == -1L)
connectionNum = db.getConnectionNumber(contactId, transportId);
byte[] secret = db.getSharedSecret(contactId);
return connWriterFactory.createConnectionWriter(
connection.getOutputStream(), Long.MAX_VALUE, true, transportId,
connectionNum, secret);
}
}

View File

@@ -37,7 +37,7 @@ import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.api.transport.StreamTransportConnection;
abstract class StreamConnection implements Runnable, DatabaseListener {
abstract class StreamConnection implements DatabaseListener {
private static enum State { SEND_OFFER, IDLE, AWAIT_REQUEST, SEND_BATCHES };
@@ -113,8 +113,8 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
notifyAll();
}
} else if(proto.hasRequest()) {
Collection<MessageId> offered, seen, unseen;
Request r = proto.readRequest();
Collection<MessageId> offered, seen, unseen;
synchronized(this) {
if(outgoingOffer == null)
throw new IOException("Unexpected request packet");
@@ -200,23 +200,21 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
flags = writerFlags;
writerFlags = 0;
}
if((flags & Flags.BATCH_RECEIVED) != 0) {
sendAcks(ackWriter);
}
// Handle the flags in approximate order of urgency
if((flags & Flags.CONTACTS_UPDATED) != 0) {
if(!db.getContacts().contains(contactId)) {
close = true;
break;
}
}
if((flags & Flags.MESSAGES_ADDED) != 0) {
state = State.SEND_OFFER;
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
sendTransports(transportWriter);
}
if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
sendSubscriptions(subscriptionWriter);
}
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
sendTransports(transportWriter);
if((flags & Flags.BATCH_RECEIVED) != 0) {
sendAcks(ackWriter);
}
if((flags & Flags.OFFER_RECEIVED) != 0) {
sendRequest(requestWriter);
@@ -224,6 +222,9 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
if((flags & Flags.REQUEST_RECEIVED) != 0) {
throw new IOException("Unexpected request packet");
}
if((flags & Flags.MESSAGES_ADDED) != 0) {
state = State.SEND_OFFER;
}
break;
case AWAIT_REQUEST:
@@ -237,23 +238,21 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
flags = writerFlags;
writerFlags = 0;
}
if((flags & Flags.BATCH_RECEIVED) != 0) {
sendAcks(ackWriter);
}
// Handle the flags in approximate order of urgency
if((flags & Flags.CONTACTS_UPDATED) != 0) {
if(!db.getContacts().contains(contactId)) {
close = true;
break;
}
}
if((flags & Flags.MESSAGES_ADDED) != 0) {
// Ignored in this state
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
sendTransports(transportWriter);
}
if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
sendSubscriptions(subscriptionWriter);
}
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
sendTransports(transportWriter);
if((flags & Flags.BATCH_RECEIVED) != 0) {
sendAcks(ackWriter);
}
if((flags & Flags.OFFER_RECEIVED) != 0) {
sendRequest(requestWriter);
@@ -261,31 +260,32 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
if((flags & Flags.REQUEST_RECEIVED) != 0) {
state = State.SEND_BATCHES;
}
if((flags & Flags.MESSAGES_ADDED) != 0) {
// Ignored in this state
}
break;
case SEND_BATCHES:
// Deal with any flags that have been raised
// Check whether any flags have been raised
synchronized(this) {
flags = writerFlags;
writerFlags = 0;
}
if((flags & Flags.BATCH_RECEIVED) != 0) {
sendAcks(ackWriter);
}
// Handle the flags in approximate order of urgency
if((flags & Flags.CONTACTS_UPDATED) != 0) {
if(!db.getContacts().contains(contactId)) {
close = true;
break;
}
}
if((flags & Flags.MESSAGES_ADDED) != 0) {
// Ignored in this state
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
sendTransports(transportWriter);
}
if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
sendSubscriptions(subscriptionWriter);
}
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
sendTransports(transportWriter);
if((flags & Flags.BATCH_RECEIVED) != 0) {
sendAcks(ackWriter);
}
if((flags & Flags.OFFER_RECEIVED) != 0) {
sendRequest(requestWriter);
@@ -293,6 +293,9 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
if((flags & Flags.REQUEST_RECEIVED) != 0) {
throw new IOException("Unexpected request packet");
}
if((flags & Flags.MESSAGES_ADDED) != 0) {
// Ignored in this state
}
// Send a batch if possible, otherwise an offer
if(!sendBatch(batchWriter)) state = State.SEND_OFFER;
break;

View File

@@ -0,0 +1,73 @@
package net.sf.briar.transport.stream;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.api.transport.StreamTransportConnection;
import net.sf.briar.api.transport.stream.StreamConnectionFactory;
import com.google.inject.Inject;
public class StreamConnectionFactoryImpl implements StreamConnectionFactory {
private final ConnectionReaderFactory connReaderFactory;
private final ConnectionWriterFactory connWriterFactory;
private final DatabaseComponent db;
private final ProtocolReaderFactory protoReaderFactory;
private final ProtocolWriterFactory protoWriterFactory;
@Inject
StreamConnectionFactoryImpl(ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory) {
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
this.db = db;
this.protoReaderFactory = protoReaderFactory;
this.protoWriterFactory = protoWriterFactory;
}
public Runnable[] createIncomingConnection(ContactId c,
StreamTransportConnection s, byte[] encryptedIv) {
final StreamConnection conn = new IncomingStreamConnection(
connReaderFactory, connWriterFactory, db, protoReaderFactory,
protoWriterFactory, c, s, encryptedIv);
Runnable[] runnables = new Runnable[2];
runnables[0] = new Runnable() {
public void run() {
conn.write();
}
};
runnables[1] = new Runnable() {
public void run() {
conn.read();
}
};
return runnables;
}
public Runnable[] createOutgoingConnection(TransportId t, ContactId c,
StreamTransportConnection s) {
final StreamConnection conn = new OutgoingStreamConnection(
connReaderFactory, connWriterFactory, db, protoReaderFactory,
protoWriterFactory, c, s, t);
Runnable[] runnables = new Runnable[2];
runnables[0] = new Runnable() {
public void run() {
conn.write();
}
};
runnables[1] = new Runnable() {
public void run() {
conn.read();
}
};
return runnables;
}
}