Massive refactoring to merge handling of simplex and duplex connections.

This commit is contained in:
akwizgran
2014-11-04 16:51:25 +00:00
parent b24f153704
commit 7b8181e309
67 changed files with 1981 additions and 2288 deletions

View File

@@ -76,14 +76,10 @@ class CryptoComponentImpl implements CryptoComponent {
// Labels for key derivation
private static final byte[] A_TAG = { 'A', '_', 'T', 'A', 'G', '\0' };
private static final byte[] B_TAG = { 'B', '_', 'T', 'A', 'G', '\0' };
private static final byte[] A_FRAME_A =
{ 'A', '_', 'F', 'R', 'A', 'M', 'E', '_', 'A', '\0' };
private static final byte[] A_FRAME_B =
{ 'A', '_', 'F', 'R', 'A', 'M', 'E', '_', 'B', '\0' };
private static final byte[] B_FRAME_A =
{ 'B', '_', 'F', 'R', 'A', 'M', 'E', '_', 'A', '\0' };
private static final byte[] B_FRAME_B =
{ 'B', '_', 'F', 'R', 'A', 'M', 'E', '_', 'B', '\0' };
private static final byte[] A_FRAME =
{ 'A', '_', 'F', 'R', 'A', 'M', 'E', '\0' };
private static final byte[] B_FRAME =
{ 'B', '_', 'F', 'R', 'A', 'M', 'E', '\0' };
// Blank secret for argument validation
private static final byte[] BLANK_SECRET = new byte[CIPHER_KEY_BYTES];
@@ -288,20 +284,15 @@ class CryptoComponentImpl implements CryptoComponent {
}
public SecretKey deriveFrameKey(byte[] secret, long streamNumber,
boolean alice, boolean initiator) {
boolean alice) {
if(secret.length != CIPHER_KEY_BYTES)
throw new IllegalArgumentException();
if(Arrays.equals(secret, BLANK_SECRET))
throw new IllegalArgumentException();
if(streamNumber < 0 || streamNumber > MAX_32_BIT_UNSIGNED)
throw new IllegalArgumentException();
if(alice) {
if(initiator) return deriveKey(secret, A_FRAME_A, streamNumber);
else return deriveKey(secret, A_FRAME_B, streamNumber);
} else {
if(initiator) return deriveKey(secret, B_FRAME_A, streamNumber);
else return deriveKey(secret, B_FRAME_B, streamNumber);
}
if(alice) return deriveKey(secret, A_FRAME, streamNumber);
else return deriveKey(secret, B_FRAME, streamNumber);
}
private SecretKey deriveKey(byte[] secret, byte[] label, long context) {

View File

@@ -75,8 +75,8 @@ class AliceConnector extends Connector {
Writer w;
byte[] secret;
try {
in = conn.getInputStream();
out = conn.getOutputStream();
in = conn.getReader().getInputStream();
out = conn.getWriter().getOutputStream();
r = readerFactory.createReader(in);
w = writerFactory.createWriter(out);
// Alice goes first
@@ -130,7 +130,7 @@ class AliceConnector extends Connector {
// Confirmation succeeded - upgrade to a secure connection
if(LOG.isLoggable(INFO))
LOG.info(pluginName + " confirmation succeeded");
int maxFrameLength = conn.getMaxFrameLength();
int maxFrameLength = conn.getReader().getMaxFrameLength();
StreamReader streamReader =
streamReaderFactory.createInvitationStreamReader(in,
maxFrameLength, secret, false);

View File

@@ -69,8 +69,8 @@ class BobConnector extends Connector {
Writer w;
byte[] secret;
try {
in = conn.getInputStream();
out = conn.getOutputStream();
in = conn.getReader().getInputStream();
out = conn.getWriter().getOutputStream();
r = readerFactory.createReader(in);
w = writerFactory.createWriter(out);
// Alice goes first
@@ -130,7 +130,7 @@ class BobConnector extends Connector {
// Confirmation succeeded - upgrade to a secure connection
if(LOG.isLoggable(INFO))
LOG.info(pluginName + " confirmation succeeded");
int maxFrameLength = conn.getMaxFrameLength();
int maxFrameLength = conn.getReader().getMaxFrameLength();
StreamReader streamReader =
streamReaderFactory.createInvitationStreamReader(in,
maxFrameLength, secret, true);

View File

@@ -311,7 +311,8 @@ abstract class Connector extends Thread {
boolean exception) {
try {
LOG.info("Closing connection");
conn.dispose(exception, true);
conn.getReader().dispose(exception, true);
conn.getWriter().dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}

View File

@@ -1,4 +1,4 @@
package org.briarproject.messaging.simplex;
package org.briarproject.messaging;
import static java.util.logging.Level.WARNING;
@@ -10,12 +10,12 @@ import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.PacketReader;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.RetentionAck;
@@ -25,103 +25,91 @@ import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.plugins.simplex.SimplexTransportReader;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.plugins.TransportConnectionReader;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamReader;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.util.ByteUtils;
class IncomingSimplexConnection {
/**
* An incoming {@link org.briarproject.api.messaging.MessagingSession
* MessagingSession}.
*/
class IncomingSession implements MessagingSession {
private static final Logger LOG =
Logger.getLogger(IncomingSimplexConnection.class.getName());
Logger.getLogger(IncomingSession.class.getName());
private final DatabaseComponent db;
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final ConnectionRegistry connRegistry;
private final StreamReaderFactory connReaderFactory;
private final StreamReaderFactory streamReaderFactory;
private final PacketReaderFactory packetReaderFactory;
private final StreamContext ctx;
private final SimplexTransportReader transport;
private final TransportConnectionReader transportReader;
private final ContactId contactId;
private final TransportId transportId;
IncomingSimplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
StreamReaderFactory connReaderFactory,
private volatile boolean interrupted = false;
IncomingSession(DatabaseComponent db, Executor dbExecutor,
Executor cryptoExecutor, MessageVerifier messageVerifier,
StreamReaderFactory streamReaderFactory,
PacketReaderFactory packetReaderFactory, StreamContext ctx,
SimplexTransportReader transport) {
TransportConnectionReader transportReader) {
this.db = db;
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
this.streamReaderFactory = streamReaderFactory;
this.packetReaderFactory = packetReaderFactory;
this.ctx = ctx;
this.transport = transport;
this.transportReader = transportReader;
contactId = ctx.getContactId();
transportId = ctx.getTransportId();
}
void read() {
connRegistry.registerConnection(contactId, transportId);
try {
InputStream in = transport.getInputStream();
int maxFrameLength = transport.getMaxFrameLength();
StreamReader conn = connReaderFactory.createStreamReader(in,
maxFrameLength, ctx, true, true);
in = conn.getInputStream();
PacketReader reader = packetReaderFactory.createPacketReader(in);
// Read packets until EOF
while(!reader.eof()) {
if(reader.hasAck()) {
Ack a = reader.readAck();
dbExecutor.execute(new ReceiveAck(a));
} else if(reader.hasMessage()) {
UnverifiedMessage m = reader.readMessage();
cryptoExecutor.execute(new VerifyMessage(m));
} else if(reader.hasRetentionAck()) {
RetentionAck a = reader.readRetentionAck();
dbExecutor.execute(new ReceiveRetentionAck(a));
} else if(reader.hasRetentionUpdate()) {
RetentionUpdate u = reader.readRetentionUpdate();
dbExecutor.execute(new ReceiveRetentionUpdate(u));
} else if(reader.hasSubscriptionAck()) {
SubscriptionAck a = reader.readSubscriptionAck();
dbExecutor.execute(new ReceiveSubscriptionAck(a));
} else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = reader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if(reader.hasTransportAck()) {
TransportAck a = reader.readTransportAck();
dbExecutor.execute(new ReceiveTransportAck(a));
} else if(reader.hasTransportUpdate()) {
TransportUpdate u = reader.readTransportUpdate();
dbExecutor.execute(new ReceiveTransportUpdate(u));
} else {
throw new FormatException();
}
public void run() throws IOException {
InputStream in = transportReader.getInputStream();
int maxFrameLength = transportReader.getMaxFrameLength();
StreamReader streamReader = streamReaderFactory.createStreamReader(in,
maxFrameLength, ctx);
in = streamReader.getInputStream();
PacketReader packetReader = packetReaderFactory.createPacketReader(in);
// Read packets until interrupted or EOF
while(!interrupted && !packetReader.eof()) {
if(packetReader.hasAck()) {
Ack a = packetReader.readAck();
dbExecutor.execute(new ReceiveAck(a));
} else if(packetReader.hasMessage()) {
UnverifiedMessage m = packetReader.readMessage();
cryptoExecutor.execute(new VerifyMessage(m));
} else if(packetReader.hasRetentionAck()) {
RetentionAck a = packetReader.readRetentionAck();
dbExecutor.execute(new ReceiveRetentionAck(a));
} else if(packetReader.hasRetentionUpdate()) {
RetentionUpdate u = packetReader.readRetentionUpdate();
dbExecutor.execute(new ReceiveRetentionUpdate(u));
} else if(packetReader.hasSubscriptionAck()) {
SubscriptionAck a = packetReader.readSubscriptionAck();
dbExecutor.execute(new ReceiveSubscriptionAck(a));
} else if(packetReader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = packetReader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if(packetReader.hasTransportAck()) {
TransportAck a = packetReader.readTransportAck();
dbExecutor.execute(new ReceiveTransportAck(a));
} else if(packetReader.hasTransportUpdate()) {
TransportUpdate u = packetReader.readTransportUpdate();
dbExecutor.execute(new ReceiveTransportUpdate(u));
} else {
throw new FormatException();
}
dispose(false, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
} finally {
connRegistry.unregisterConnection(contactId, transportId);
}
in.close();
}
private void dispose(boolean exception, boolean recognised) {
ByteUtils.erase(ctx.getSecret());
try {
transport.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
public void interrupt() {
// This won't interrupt a blocking read, but the read will throw an
// exception when the transport connection is closed
interrupted = true;
}
private class ReceiveAck implements Runnable {

View File

@@ -1,5 +1,7 @@
package org.briarproject.messaging;
import javax.inject.Singleton;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorFactory;
import org.briarproject.api.crypto.CryptoComponent;
@@ -9,6 +11,7 @@ import org.briarproject.api.messaging.MessageFactory;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.MessagingSessionFactory;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.serial.StructReader;
@@ -18,6 +21,7 @@ import com.google.inject.Provides;
public class MessagingModule extends AbstractModule {
@Override
protected void configure() {
bind(AuthorFactory.class).to(AuthorFactoryImpl.class);
bind(GroupFactory.class).to(GroupFactoryImpl.class);
@@ -25,6 +29,8 @@ public class MessagingModule extends AbstractModule {
bind(MessageVerifier.class).to(MessageVerifierImpl.class);
bind(PacketReaderFactory.class).to(PacketReaderFactoryImpl.class);
bind(PacketWriterFactory.class).to(PacketWriterFactoryImpl.class);
bind(MessagingSessionFactory.class).to(
MessagingSessionFactoryImpl.class).in(Singleton.class);
}
@Provides

View File

@@ -0,0 +1,67 @@
package org.briarproject.messaging;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import org.briarproject.api.crypto.CryptoExecutor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.MessagingSessionFactory;
import org.briarproject.api.plugins.TransportConnectionReader;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.api.transport.StreamWriterFactory;
class MessagingSessionFactoryImpl implements MessagingSessionFactory {
private final DatabaseComponent db;
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final EventBus eventBus;
private final StreamReaderFactory streamReaderFactory;
private final StreamWriterFactory streamWriterFactory;
private final PacketReaderFactory packetReaderFactory;
private final PacketWriterFactory packetWriterFactory;
@Inject
MessagingSessionFactoryImpl(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor,
MessageVerifier messageVerifier, EventBus eventBus,
StreamReaderFactory streamReaderFactory,
StreamWriterFactory streamWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory) {
this.db = db;
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.eventBus = eventBus;
this.streamReaderFactory = streamReaderFactory;
this.streamWriterFactory = streamWriterFactory;
this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory;
}
public MessagingSession createIncomingSession(StreamContext ctx,
TransportConnectionReader r) {
return new IncomingSession(db, dbExecutor, cryptoExecutor,
messageVerifier, streamReaderFactory, packetReaderFactory,
ctx, r);
}
public MessagingSession createOutgoingSession(StreamContext ctx,
TransportConnectionWriter w, boolean duplex) {
if(duplex) return new ReactiveOutgoingSession(db, dbExecutor, eventBus,
streamWriterFactory, packetWriterFactory, ctx, w);
else return new SinglePassOutgoingSession(db, dbExecutor,
streamWriterFactory, packetWriterFactory, ctx, w);
}
}

View File

@@ -0,0 +1,518 @@
package org.briarproject.messaging;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageExpiredEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.RemoteRetentionTimeUpdatedEvent;
import org.briarproject.api.event.RemoteSubscriptionsUpdatedEvent;
import org.briarproject.api.event.RemoteTransportsUpdatedEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.Offer;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.Request;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamWriter;
import org.briarproject.api.transport.StreamWriterFactory;
/**
* An outgoing {@link org.briarproject.api.messaging.MessagingSession
* MessagingSession} that keeps its output stream open and reacts to events
* that make packets available to send.
*/
class ReactiveOutgoingSession implements MessagingSession, EventListener {
private static final Logger LOG =
Logger.getLogger(ReactiveOutgoingSession.class.getName());
private static final ThrowingRunnable<IOException> CLOSE =
new ThrowingRunnable<IOException>() {
public void run() {}
};
private final DatabaseComponent db;
private final Executor dbExecutor;
private final EventBus eventBus;
private final StreamWriterFactory streamWriterFactory;
private final PacketWriterFactory packetWriterFactory;
private final StreamContext ctx;
private final TransportConnectionWriter transportWriter;
private final ContactId contactId;
private final long maxLatency;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
private volatile PacketWriter packetWriter = null;
private volatile boolean interrupted = false;
ReactiveOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, StreamWriterFactory streamWriterFactory,
PacketWriterFactory packetWriterFactory, StreamContext ctx,
TransportConnectionWriter transportWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.streamWriterFactory = streamWriterFactory;
this.packetWriterFactory = packetWriterFactory;
this.ctx = ctx;
this.transportWriter = transportWriter;
contactId = ctx.getContactId();
maxLatency = transportWriter.getMaxLatency();
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
}
public void run() throws IOException {
eventBus.addListener(this);
try {
OutputStream out = transportWriter.getOutputStream();
int maxFrameLength = transportWriter.getMaxFrameLength();
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
out, maxFrameLength, ctx);
out = streamWriter.getOutputStream();
packetWriter = packetWriterFactory.createPacketWriter(out, true);
// Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionAck());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
dbExecutor.execute(new GenerateRequest());
// Write packets until interrupted
try {
while(!interrupted) {
ThrowingRunnable<IOException> task = writerTasks.take();
if(task == CLOSE) break;
task.run();
}
out.flush();
out.close();
} catch(InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write");
Thread.currentThread().interrupt();
}
} finally {
eventBus.removeListener(this);
}
}
public void interrupt() {
interrupted = true;
writerTasks.add(CLOSE);
}
public void eventOccurred(Event e) {
if(e instanceof ContactRemovedEvent) {
ContactRemovedEvent c = (ContactRemovedEvent) e;
if(contactId.equals(c.getContactId())) {
LOG.info("Contact removed, closing");
interrupt();
}
} else if(e instanceof MessageAddedEvent) {
dbExecutor.execute(new GenerateOffer());
} else if(e instanceof MessageExpiredEvent) {
dbExecutor.execute(new GenerateRetentionUpdate());
} else if(e instanceof LocalSubscriptionsUpdatedEvent) {
LocalSubscriptionsUpdatedEvent l =
(LocalSubscriptionsUpdatedEvent) e;
if(l.getAffectedContacts().contains(contactId)) {
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateOffer());
}
} else if(e instanceof LocalTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportUpdates());
} else if(e instanceof MessageRequestedEvent) {
if(((MessageRequestedEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateBatch());
} else if(e instanceof MessageToAckEvent) {
if(((MessageToAckEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateAck());
} else if(e instanceof MessageToRequestEvent) {
if(((MessageToRequestEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateRequest());
} else if(e instanceof RemoteRetentionTimeUpdatedEvent) {
dbExecutor.execute(new GenerateRetentionAck());
} else if(e instanceof RemoteSubscriptionsUpdatedEvent) {
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateOffer());
} else if(e instanceof RemoteTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportAcks());
} else if(e instanceof TransportRemovedEvent) {
TransportRemovedEvent t = (TransportRemovedEvent) e;
if(ctx.getTransportId().equals(t.getTransportId())) {
LOG.info("Transport removed, closing");
interrupt();
}
}
}
// This task runs on the database thread
private class GenerateAck implements Runnable {
public void run() {
int maxMessages = packetWriter.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if(a != null) writerTasks.add(new WriteAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteAck implements ThrowingRunnable<IOException> {
private final Ack ack;
private WriteAck(Ack ack) {
this.ack = ack;
}
public void run() throws IOException {
packetWriter.writeAck(ack);
LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck());
}
}
// This task runs on the database thread
private class GenerateBatch implements Runnable {
public void run() {
try {
Collection<byte[]> b = db.generateRequestedBatch(contactId,
MAX_PACKET_LENGTH, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if(b != null) writerTasks.add(new WriteBatch(b));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteBatch implements ThrowingRunnable<IOException> {
private final Collection<byte[]> batch;
private WriteBatch(Collection<byte[]> batch) {
this.batch = batch;
}
public void run() throws IOException {
for(byte[] raw : batch) packetWriter.writeMessage(raw);
LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch());
}
}
// This task runs on the database thread
private class GenerateOffer implements Runnable {
public void run() {
int maxMessages = packetWriter.getMaxMessagesForOffer(
Long.MAX_VALUE);
try {
Offer o = db.generateOffer(contactId, maxMessages, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated offer: " + (o != null));
if(o != null) writerTasks.add(new WriteOffer(o));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteOffer implements ThrowingRunnable<IOException> {
private final Offer offer;
private WriteOffer(Offer offer) {
this.offer = offer;
}
public void run() throws IOException {
packetWriter.writeOffer(offer);
LOG.info("Sent offer");
dbExecutor.execute(new GenerateOffer());
}
}
// This task runs on the database thread
private class GenerateRequest implements Runnable {
public void run() {
int maxMessages = packetWriter.getMaxMessagesForRequest(
Long.MAX_VALUE);
try {
Request r = db.generateRequest(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated request: " + (r != null));
if(r != null) writerTasks.add(new WriteRequest(r));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteRequest implements ThrowingRunnable<IOException> {
private final Request request;
private WriteRequest(Request request) {
this.request = request;
}
public void run() throws IOException {
packetWriter.writeRequest(request);
LOG.info("Sent request");
dbExecutor.execute(new GenerateRequest());
}
}
// This task runs on the database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
try {
RetentionAck a = db.generateRetentionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention ack: " + (a != null));
if(a != null) writerTasks.add(new WriteRetentionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteRetentionAck implements ThrowingRunnable<IOException> {
private final RetentionAck ack;
private WriteRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
packetWriter.writeRetentionAck(ack);
LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck());
}
}
// This task runs on the database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention update: " + (u != null));
if(u != null) writerTasks.add(new WriteRetentionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteRetentionUpdate
implements ThrowingRunnable<IOException> {
private final RetentionUpdate update;
private WriteRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() throws IOException {
packetWriter.writeRetentionUpdate(update);
LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate());
}
}
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription ack: " + (a != null));
if(a != null) writerTasks.add(new WriteSubscriptionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteSubscriptionAck
implements ThrowingRunnable<IOException> {
private final SubscriptionAck ack;
private WriteSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
packetWriter.writeSubscriptionAck(ack);
LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck());
}
}
// This task runs on the database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription update: " + (u != null));
if(u != null) writerTasks.add(new WriteSubscriptionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteSubscriptionUpdate
implements ThrowingRunnable<IOException> {
private final SubscriptionUpdate update;
private WriteSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() throws IOException {
packetWriter.writeSubscriptionUpdate(update);
LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate());
}
}
// This task runs on the database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport acks: " + (acks != null));
if(acks != null) writerTasks.add(new WriteTransportAcks(acks));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteTransportAcks implements ThrowingRunnable<IOException> {
private final Collection<TransportAck> acks;
private WriteTransportAcks(Collection<TransportAck> acks) {
this.acks = acks;
}
public void run() throws IOException {
for(TransportAck a : acks) packetWriter.writeTransportAck(a);
LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks());
}
}
// This task runs on the database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport updates: " + (t != null));
if(t != null) writerTasks.add(new WriteTransportUpdates(t));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteTransportUpdates
implements ThrowingRunnable<IOException> {
private final Collection<TransportUpdate> updates;
private WriteTransportUpdates(Collection<TransportUpdate> updates) {
this.updates = updates;
}
public void run() throws IOException {
for(TransportUpdate u : updates)
packetWriter.writeTransportUpdate(u);
LOG.info("Sent transport updates");
dbExecutor.execute(new GenerateTransportUpdates());
}
}
}

View File

@@ -0,0 +1,395 @@
package org.briarproject.messaging;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamWriter;
import org.briarproject.api.transport.StreamWriterFactory;
/**
* An outgoing {@link org.briarproject.api.messaging.MessagingSession
* MessagingSession} that closes its output stream when no more packets are
* available to send.
*/
class SinglePassOutgoingSession implements MessagingSession {
private static final Logger LOG =
Logger.getLogger(SinglePassOutgoingSession.class.getName());
private static final ThrowingRunnable<IOException> CLOSE =
new ThrowingRunnable<IOException>() {
public void run() {}
};
private final DatabaseComponent db;
private final Executor dbExecutor;
private final StreamWriterFactory streamWriterFactory;
private final PacketWriterFactory packetWriterFactory;
private final StreamContext ctx;
private final TransportConnectionWriter transportWriter;
private final ContactId contactId;
private final long maxLatency;
private final AtomicInteger outstandingQueries;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
private volatile StreamWriter streamWriter = null;
private volatile PacketWriter packetWriter = null;
private volatile boolean interrupted = false;
SinglePassOutgoingSession(DatabaseComponent db, Executor dbExecutor,
StreamWriterFactory streamWriterFactory,
PacketWriterFactory packetWriterFactory, StreamContext ctx,
TransportConnectionWriter transportWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
this.streamWriterFactory = streamWriterFactory;
this.packetWriterFactory = packetWriterFactory;
this.ctx = ctx;
this.transportWriter = transportWriter;
contactId = ctx.getContactId();
maxLatency = transportWriter.getMaxLatency();
outstandingQueries = new AtomicInteger(8); // One per type of packet
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
}
public void run() throws IOException {
OutputStream out = transportWriter.getOutputStream();
int maxFrameLength = transportWriter.getMaxFrameLength();
streamWriter = streamWriterFactory.createStreamWriter(out,
maxFrameLength, ctx);
out = streamWriter.getOutputStream();
packetWriter = packetWriterFactory.createPacketWriter(out, false);
// Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionAck());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
// Write packets until interrupted or there are no more packets to write
try {
while(!interrupted) {
ThrowingRunnable<IOException> task = writerTasks.take();
if(task == CLOSE) break;
task.run();
}
out.flush();
out.close();
} catch(InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write");
Thread.currentThread().interrupt();
}
}
public void interrupt() {
interrupted = true;
writerTasks.add(CLOSE);
}
private void decrementOutstandingQueries() {
if(outstandingQueries.decrementAndGet() == 0) writerTasks.add(CLOSE);
}
// This task runs on the database thread
private class GenerateAck implements Runnable {
public void run() {
int maxMessages = packetWriter.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if(a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteAck implements ThrowingRunnable<IOException> {
private final Ack ack;
private WriteAck(Ack ack) {
this.ack = ack;
}
public void run() throws IOException {
packetWriter.writeAck(ack);
LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck());
}
}
// This task runs on the database thread
private class GenerateBatch implements Runnable {
public void run() {
try {
Collection<byte[]> b = db.generateBatch(contactId,
MAX_PACKET_LENGTH, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if(b == null) decrementOutstandingQueries();
else writerTasks.add(new WriteBatch(b));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteBatch implements ThrowingRunnable<IOException> {
private final Collection<byte[]> batch;
private WriteBatch(Collection<byte[]> batch) {
this.batch = batch;
}
public void run() throws IOException {
for(byte[] raw : batch) packetWriter.writeMessage(raw);
LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch());
}
}
// This task runs on the database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
try {
RetentionAck a = db.generateRetentionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention ack: " + (a != null));
if(a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteRetentionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteRetentionAck implements ThrowingRunnable<IOException> {
private final RetentionAck ack;
private WriteRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
packetWriter.writeRetentionAck(ack);
LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck());
}
}
// This task runs on the database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention update: " + (u != null));
if(u == null) decrementOutstandingQueries();
else writerTasks.add(new WriteRetentionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteRetentionUpdate
implements ThrowingRunnable<IOException> {
private final RetentionUpdate update;
private WriteRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() throws IOException {
packetWriter.writeRetentionUpdate(update);
LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate());
}
}
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription ack: " + (a != null));
if(a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteSubscriptionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteSubscriptionAck
implements ThrowingRunnable<IOException> {
private final SubscriptionAck ack;
private WriteSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
packetWriter.writeSubscriptionAck(ack);
LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck());
}
}
// This task runs on the database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription update: " + (u != null));
if(u == null) decrementOutstandingQueries();
else writerTasks.add(new WriteSubscriptionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteSubscriptionUpdate
implements ThrowingRunnable<IOException> {
private final SubscriptionUpdate update;
private WriteSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() throws IOException {
packetWriter.writeSubscriptionUpdate(update);
LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate());
}
}
// This task runs on the database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport acks: " + (acks != null));
if(acks == null) decrementOutstandingQueries();
else writerTasks.add(new WriteTransportAcks(acks));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteTransportAcks implements ThrowingRunnable<IOException> {
private final Collection<TransportAck> acks;
private WriteTransportAcks(Collection<TransportAck> acks) {
this.acks = acks;
}
public void run() throws IOException {
for(TransportAck a : acks) packetWriter.writeTransportAck(a);
LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks());
}
}
// This task runs on the database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport updates: " + (t != null));
if(t == null) decrementOutstandingQueries();
else writerTasks.add(new WriteTransportUpdates(t));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteTransportUpdates
implements ThrowingRunnable<IOException> {
private final Collection<TransportUpdate> updates;
private WriteTransportUpdates(Collection<TransportUpdate> updates) {
this.updates = updates;
}
public void run() throws IOException {
for(TransportUpdate u : updates)
packetWriter.writeTransportUpdate(u);
LOG.info("Sent transport updates");
dbExecutor.execute(new GenerateTransportUpdates());
}
}
}

View File

@@ -0,0 +1,6 @@
package org.briarproject.messaging;
interface ThrowingRunnable<T extends Throwable> {
public void run() throws T;
}

View File

@@ -1,871 +0,0 @@
package org.briarproject.messaging.duplex;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.GeneralSecurityException;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageExpiredEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.RemoteRetentionTimeUpdatedEvent;
import org.briarproject.api.event.RemoteSubscriptionsUpdatedEvent;
import org.briarproject.api.event.RemoteTransportsUpdatedEvent;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.Offer;
import org.briarproject.api.messaging.PacketReader;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.Request;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.messaging.UnverifiedMessage;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamReader;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.api.transport.StreamWriter;
import org.briarproject.api.transport.StreamWriterFactory;
import org.briarproject.util.ByteUtils;
abstract class DuplexConnection implements EventListener {
private static final Logger LOG =
Logger.getLogger(DuplexConnection.class.getName());
private static final Runnable CLOSE = new Runnable() {
public void run() {}
};
private static final Runnable DIE = new Runnable() {
public void run() {}
};
protected final DatabaseComponent db;
protected final EventBus eventBus;
protected final ConnectionRegistry connRegistry;
protected final StreamReaderFactory connReaderFactory;
protected final StreamWriterFactory connWriterFactory;
protected final PacketReaderFactory packetReaderFactory;
protected final PacketWriterFactory packetWriterFactory;
protected final StreamContext ctx;
protected final DuplexTransportConnection transport;
protected final ContactId contactId;
protected final TransportId transportId;
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final long maxLatency;
private final AtomicBoolean disposed;
private final BlockingQueue<Runnable> writerTasks;
private volatile PacketWriter writer = null;
DuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
EventBus eventBus, ConnectionRegistry connRegistry,
StreamReaderFactory connReaderFactory,
StreamWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory, StreamContext ctx,
DuplexTransportConnection transport) {
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.eventBus = eventBus;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory;
this.ctx = ctx;
this.transport = transport;
contactId = ctx.getContactId();
transportId = ctx.getTransportId();
maxLatency = transport.getMaxLatency();
disposed = new AtomicBoolean(false);
writerTasks = new LinkedBlockingQueue<Runnable>();
}
protected abstract StreamReader createStreamReader() throws IOException;
protected abstract StreamWriter createStreamWriter() throws IOException;
public void eventOccurred(Event e) {
if(e instanceof ContactRemovedEvent) {
ContactRemovedEvent c = (ContactRemovedEvent) e;
if(contactId.equals(c.getContactId())) writerTasks.add(CLOSE);
} else if(e instanceof MessageAddedEvent) {
dbExecutor.execute(new GenerateOffer());
} else if(e instanceof MessageExpiredEvent) {
dbExecutor.execute(new GenerateRetentionUpdate());
} else if(e instanceof LocalSubscriptionsUpdatedEvent) {
LocalSubscriptionsUpdatedEvent l =
(LocalSubscriptionsUpdatedEvent) e;
if(l.getAffectedContacts().contains(contactId)) {
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateOffer());
}
} else if(e instanceof LocalTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportUpdates());
} else if(e instanceof MessageRequestedEvent) {
if(((MessageRequestedEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateBatch());
} else if(e instanceof MessageToAckEvent) {
if(((MessageToAckEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateAck());
} else if(e instanceof MessageToRequestEvent) {
if(((MessageToRequestEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateRequest());
} else if(e instanceof RemoteRetentionTimeUpdatedEvent) {
dbExecutor.execute(new GenerateRetentionAck());
} else if(e instanceof RemoteSubscriptionsUpdatedEvent) {
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateOffer());
} else if(e instanceof RemoteTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportAcks());
}
}
void read() {
try {
InputStream in = createStreamReader().getInputStream();
PacketReader reader = packetReaderFactory.createPacketReader(in);
LOG.info("Starting to read");
while(!reader.eof()) {
if(reader.hasAck()) {
Ack a = reader.readAck();
LOG.info("Received ack");
dbExecutor.execute(new ReceiveAck(a));
} else if(reader.hasMessage()) {
UnverifiedMessage m = reader.readMessage();
LOG.info("Received message");
cryptoExecutor.execute(new VerifyMessage(m));
} else if(reader.hasOffer()) {
Offer o = reader.readOffer();
LOG.info("Received offer");
dbExecutor.execute(new ReceiveOffer(o));
} else if(reader.hasRequest()) {
Request r = reader.readRequest();
LOG.info("Received request");
dbExecutor.execute(new ReceiveRequest(r));
} else if(reader.hasRetentionAck()) {
RetentionAck a = reader.readRetentionAck();
LOG.info("Received retention ack");
dbExecutor.execute(new ReceiveRetentionAck(a));
} else if(reader.hasRetentionUpdate()) {
RetentionUpdate u = reader.readRetentionUpdate();
LOG.info("Received retention update");
dbExecutor.execute(new ReceiveRetentionUpdate(u));
} else if(reader.hasSubscriptionAck()) {
SubscriptionAck a = reader.readSubscriptionAck();
LOG.info("Received subscription ack");
dbExecutor.execute(new ReceiveSubscriptionAck(a));
} else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = reader.readSubscriptionUpdate();
LOG.info("Received subscription update");
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if(reader.hasTransportAck()) {
TransportAck a = reader.readTransportAck();
LOG.info("Received transport ack");
dbExecutor.execute(new ReceiveTransportAck(a));
} else if(reader.hasTransportUpdate()) {
TransportUpdate u = reader.readTransportUpdate();
LOG.info("Received transport update");
dbExecutor.execute(new ReceiveTransportUpdate(u));
} else {
throw new FormatException();
}
}
LOG.info("Finished reading");
writerTasks.add(CLOSE);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
writerTasks.add(DIE);
}
}
void write() {
connRegistry.registerConnection(contactId, transportId);
eventBus.addListener(this);
try {
OutputStream out = createStreamWriter().getOutputStream();
writer = packetWriterFactory.createPacketWriter(out, true);
LOG.info("Starting to write");
// Ensure the tag is sent
out.flush();
// Send the initial packets
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionAck());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
dbExecutor.execute(new GenerateRequest());
// Main loop
Runnable task = null;
while(true) {
LOG.info("Waiting for something to write");
task = writerTasks.take();
if(task == CLOSE || task == DIE) break;
task.run();
}
LOG.info("Finished writing");
if(task == CLOSE) {
writer.flush();
writer.close();
dispose(false, true);
} else {
dispose(true, true);
}
} catch(InterruptedException e) {
LOG.warning("Interrupted while waiting for task");
Thread.currentThread().interrupt();
dispose(true, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
eventBus.removeListener(this);
connRegistry.unregisterConnection(contactId, transportId);
}
private void dispose(boolean exception, boolean recognised) {
if(disposed.getAndSet(true)) return;
if(LOG.isLoggable(INFO))
LOG.info("Disposing: " + exception + ", " + recognised);
ByteUtils.erase(ctx.getSecret());
try {
transport.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
// This task runs on the database thread
private class ReceiveAck implements Runnable {
private final Ack ack;
private ReceiveAck(Ack ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveAck(contactId, ack);
LOG.info("DB received ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a crypto thread
private class VerifyMessage implements Runnable {
private final UnverifiedMessage message;
private VerifyMessage(UnverifiedMessage message) {
this.message = message;
}
public void run() {
try {
Message m = messageVerifier.verifyMessage(message);
LOG.info("Verified message");
dbExecutor.execute(new ReceiveMessage(m));
} catch(GeneralSecurityException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class ReceiveMessage implements Runnable {
private final Message message;
private ReceiveMessage(Message message) {
this.message = message;
}
public void run() {
try {
db.receiveMessage(contactId, message);
LOG.info("DB received message");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class ReceiveOffer implements Runnable {
private final Offer offer;
private ReceiveOffer(Offer offer) {
this.offer = offer;
}
public void run() {
try {
db.receiveOffer(contactId, offer);
LOG.info("DB received offer");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class ReceiveRequest implements Runnable {
private final Request request;
private ReceiveRequest(Request request) {
this.request = request;
}
public void run() {
try {
db.receiveRequest(contactId, request);
LOG.info("DB received request");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class ReceiveRetentionAck implements Runnable {
private final RetentionAck ack;
private ReceiveRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveRetentionAck(contactId, ack);
LOG.info("DB received retention ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class ReceiveRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private ReceiveRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveRetentionUpdate(contactId, update);
LOG.info("DB received retention update");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private ReceiveSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveSubscriptionAck(contactId, ack);
LOG.info("DB received subscription ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class ReceiveSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;
private ReceiveSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveSubscriptionUpdate(contactId, update);
LOG.info("DB received subscription update");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class ReceiveTransportAck implements Runnable {
private final TransportAck ack;
private ReceiveTransportAck(TransportAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveTransportAck(contactId, ack);
LOG.info("DB received transport ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class ReceiveTransportUpdate implements Runnable {
private final TransportUpdate update;
private ReceiveTransportUpdate(TransportUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveTransportUpdate(contactId, update);
LOG.info("DB received transport update");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the database thread
private class GenerateAck implements Runnable {
public void run() {
assert writer != null;
int maxMessages = writer.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if(a != null) writerTasks.add(new WriteAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteAck implements Runnable {
private final Ack ack;
private WriteAck(Ack ack) {
this.ack = ack;
}
public void run() {
assert writer != null;
try {
writer.writeAck(ack);
LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on the database thread
private class GenerateBatch implements Runnable {
public void run() {
assert writer != null;
try {
Collection<byte[]> b = db.generateRequestedBatch(contactId,
MAX_PACKET_LENGTH, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if(b != null) writerTasks.add(new WriteBatch(b));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteBatch implements Runnable {
private final Collection<byte[]> batch;
private WriteBatch(Collection<byte[]> batch) {
this.batch = batch;
}
public void run() {
assert writer != null;
try {
for(byte[] raw : batch) writer.writeMessage(raw);
LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on the database thread
private class GenerateOffer implements Runnable {
public void run() {
assert writer != null;
int maxMessages = writer.getMaxMessagesForOffer(Long.MAX_VALUE);
try {
Offer o = db.generateOffer(contactId, maxMessages, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated offer: " + (o != null));
if(o != null) writerTasks.add(new WriteOffer(o));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteOffer implements Runnable {
private final Offer offer;
private WriteOffer(Offer offer) {
this.offer = offer;
}
public void run() {
assert writer != null;
try {
writer.writeOffer(offer);
LOG.info("Sent offer");
dbExecutor.execute(new GenerateOffer());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on the database thread
private class GenerateRequest implements Runnable {
public void run() {
assert writer != null;
int maxMessages = writer.getMaxMessagesForRequest(Long.MAX_VALUE);
try {
Request r = db.generateRequest(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated request: " + (r != null));
if(r != null) writerTasks.add(new WriteRequest(r));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteRequest implements Runnable {
private final Request request;
private WriteRequest(Request request) {
this.request = request;
}
public void run() {
assert writer != null;
try {
writer.writeRequest(request);
LOG.info("Sent request");
dbExecutor.execute(new GenerateRequest());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on the database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
try {
RetentionAck a = db.generateRetentionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention ack: " + (a != null));
if(a != null) writerTasks.add(new WriteRetentionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This tasks runs on the writer thread
private class WriteRetentionAck implements Runnable {
private final RetentionAck ack;
private WriteRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
assert writer != null;
try {
writer.writeRetentionAck(ack);
LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on the database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention update: " + (u != null));
if(u != null) writerTasks.add(new WriteRetentionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private WriteRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
assert writer != null;
try {
writer.writeRetentionUpdate(update);
LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription ack: " + (a != null));
if(a != null) writerTasks.add(new WriteSubscriptionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This tasks runs on the writer thread
private class WriteSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private WriteSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
assert writer != null;
try {
writer.writeSubscriptionAck(ack);
LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on the database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription update: " + (u != null));
if(u != null) writerTasks.add(new WriteSubscriptionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;
private WriteSubscriptionUpdate(SubscriptionUpdate update) {
this.update = update;
}
public void run() {
assert writer != null;
try {
writer.writeSubscriptionUpdate(update);
LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on the database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport acks: " + (acks != null));
if(acks != null) writerTasks.add(new WriteTransportAcks(acks));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This tasks runs on the writer thread
private class WriteTransportAcks implements Runnable {
private final Collection<TransportAck> acks;
private WriteTransportAcks(Collection<TransportAck> acks) {
this.acks = acks;
}
public void run() {
assert writer != null;
try {
for(TransportAck a : acks) writer.writeTransportAck(a);
LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
// This task runs on the database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport updates: " + (t != null));
if(t != null) writerTasks.add(new WriteTransportUpdates(t));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on the writer thread
private class WriteTransportUpdates implements Runnable {
private final Collection<TransportUpdate> updates;
private WriteTransportUpdates(Collection<TransportUpdate> updates) {
this.updates = updates;
}
public void run() {
assert writer != null;
try {
for(TransportUpdate u : updates) writer.writeTransportUpdate(u);
LOG.info("Sent transport updates");
dbExecutor.execute(new GenerateTransportUpdates());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
}
}
}

View File

@@ -1,108 +0,0 @@
package org.briarproject.messaging.duplex;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.crypto.CryptoExecutor;
import org.briarproject.api.crypto.KeyManager;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.duplex.DuplexConnectionFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.api.transport.StreamWriterFactory;
class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
private static final Logger LOG =
Logger.getLogger(DuplexConnectionFactoryImpl.class.getName());
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final EventBus eventBus;
private final KeyManager keyManager;
private final ConnectionRegistry connRegistry;
private final StreamReaderFactory connReaderFactory;
private final StreamWriterFactory connWriterFactory;
private final PacketReaderFactory packetReaderFactory;
private final PacketWriterFactory packetWriterFactory;
@Inject
DuplexConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
EventBus eventBus, KeyManager keyManager,
ConnectionRegistry connRegistry,
StreamReaderFactory connReaderFactory,
StreamWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory) {
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.eventBus = eventBus;
this.keyManager = keyManager;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory;
}
public void createIncomingConnection(StreamContext ctx,
DuplexTransportConnection transport) {
final DuplexConnection conn = new IncomingDuplexConnection(dbExecutor,
cryptoExecutor, messageVerifier, db, eventBus, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
Runnable write = new Runnable() {
public void run() {
conn.write();
}
};
new Thread(write, "DuplexConnectionWriter").start();
Runnable read = new Runnable() {
public void run() {
conn.read();
}
};
new Thread(read, "DuplexConnectionReader").start();
}
public void createOutgoingConnection(ContactId c, TransportId t,
DuplexTransportConnection transport) {
StreamContext ctx = keyManager.getStreamContext(c, t);
if(ctx == null) {
LOG.warning("Could not create outgoing stream context");
return;
}
final DuplexConnection conn = new OutgoingDuplexConnection(dbExecutor,
cryptoExecutor, messageVerifier, db, eventBus, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
Runnable write = new Runnable() {
public void run() {
conn.write();
}
};
new Thread(write, "DuplexConnectionWriter").start();
Runnable read = new Runnable() {
public void run() {
conn.read();
}
};
new Thread(read, "DuplexConnectionReader").start();
}
}

View File

@@ -1,15 +0,0 @@
package org.briarproject.messaging.duplex;
import javax.inject.Singleton;
import org.briarproject.api.messaging.duplex.DuplexConnectionFactory;
import com.google.inject.AbstractModule;
public class DuplexMessagingModule extends AbstractModule {
protected void configure() {
bind(DuplexConnectionFactory.class).to(
DuplexConnectionFactoryImpl.class).in(Singleton.class);
}
}

View File

@@ -1,51 +0,0 @@
package org.briarproject.messaging.duplex;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamReader;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.api.transport.StreamWriter;
import org.briarproject.api.transport.StreamWriterFactory;
class IncomingDuplexConnection extends DuplexConnection {
IncomingDuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
EventBus eventBus, ConnectionRegistry connRegistry,
StreamReaderFactory connReaderFactory,
StreamWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory,
StreamContext ctx, DuplexTransportConnection transport) {
super(dbExecutor, cryptoExecutor, messageVerifier, db, eventBus,
connRegistry, connReaderFactory, connWriterFactory,
packetReaderFactory, packetWriterFactory, ctx, transport);
}
@Override
protected StreamReader createStreamReader() throws IOException {
InputStream in = transport.getInputStream();
int maxFrameLength = transport.getMaxFrameLength();
return connReaderFactory.createStreamReader(in, maxFrameLength,
ctx, true, true);
}
@Override
protected StreamWriter createStreamWriter() throws IOException {
OutputStream out = transport.getOutputStream();
int maxFrameLength = transport.getMaxFrameLength();
return connWriterFactory.createStreamWriter(out, maxFrameLength,
Long.MAX_VALUE, ctx, true, false);
}
}

View File

@@ -1,51 +0,0 @@
package org.briarproject.messaging.duplex;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamReader;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.api.transport.StreamWriter;
import org.briarproject.api.transport.StreamWriterFactory;
class OutgoingDuplexConnection extends DuplexConnection {
OutgoingDuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
EventBus eventBus, ConnectionRegistry connRegistry,
StreamReaderFactory connReaderFactory,
StreamWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory, StreamContext ctx,
DuplexTransportConnection transport) {
super(dbExecutor, cryptoExecutor, messageVerifier, db, eventBus,
connRegistry, connReaderFactory, connWriterFactory,
packetReaderFactory, packetWriterFactory, ctx, transport);
}
@Override
protected StreamReader createStreamReader() throws IOException {
InputStream in = transport.getInputStream();
int maxFrameLength = transport.getMaxFrameLength();
return connReaderFactory.createStreamReader(in, maxFrameLength,
ctx, false, false);
}
@Override
protected StreamWriter createStreamWriter() throws IOException {
OutputStream out = transport.getOutputStream();
int maxFrameLength = transport.getMaxFrameLength();
return connWriterFactory.createStreamWriter(out, maxFrameLength,
Long.MAX_VALUE, ctx, false, true);
}
}

View File

@@ -1,187 +0,0 @@
package org.briarproject.messaging.simplex;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamWriter;
import org.briarproject.api.transport.StreamWriterFactory;
import org.briarproject.util.ByteUtils;
class OutgoingSimplexConnection {
private static final Logger LOG =
Logger.getLogger(OutgoingSimplexConnection.class.getName());
private final DatabaseComponent db;
private final ConnectionRegistry connRegistry;
private final StreamWriterFactory connWriterFactory;
private final PacketWriterFactory packetWriterFactory;
private final StreamContext ctx;
private final SimplexTransportWriter transport;
private final ContactId contactId;
private final TransportId transportId;
private final long maxLatency;
OutgoingSimplexConnection(DatabaseComponent db,
ConnectionRegistry connRegistry,
StreamWriterFactory connWriterFactory,
PacketWriterFactory packetWriterFactory, StreamContext ctx,
SimplexTransportWriter transport) {
this.db = db;
this.connRegistry = connRegistry;
this.connWriterFactory = connWriterFactory;
this.packetWriterFactory = packetWriterFactory;
this.ctx = ctx;
this.transport = transport;
contactId = ctx.getContactId();
transportId = ctx.getTransportId();
maxLatency = transport.getMaxLatency();
}
void write() {
connRegistry.registerConnection(contactId, transportId);
try {
OutputStream out = transport.getOutputStream();
long capacity = transport.getCapacity();
int maxFrameLength = transport.getMaxFrameLength();
StreamWriter conn = connWriterFactory.createStreamWriter(
out, maxFrameLength, capacity, ctx, false, true);
out = conn.getOutputStream();
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH)
throw new EOFException();
PacketWriter writer = packetWriterFactory.createPacketWriter(out,
false);
// Send the initial packets: updates and acks
boolean hasSpace = writeTransportAcks(conn, writer);
if(hasSpace) hasSpace = writeTransportUpdates(conn, writer);
if(hasSpace) hasSpace = writeSubscriptionAck(conn, writer);
if(hasSpace) hasSpace = writeSubscriptionUpdate(conn, writer);
if(hasSpace) hasSpace = writeRetentionAck(conn, writer);
if(hasSpace) hasSpace = writeRetentionUpdate(conn, writer);
// Write acks until you can't write acks no more
capacity = conn.getRemainingCapacity();
int maxMessages = writer.getMaxMessagesForAck(capacity);
Ack a = db.generateAck(contactId, maxMessages);
while(a != null) {
writer.writeAck(a);
capacity = conn.getRemainingCapacity();
maxMessages = writer.getMaxMessagesForAck(capacity);
a = db.generateAck(contactId, maxMessages);
}
// Write messages until you can't write messages no more
capacity = conn.getRemainingCapacity();
int maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH);
Collection<byte[]> batch = db.generateBatch(contactId, maxLength,
maxLatency);
while(batch != null) {
for(byte[] raw : batch) writer.writeMessage(raw);
capacity = conn.getRemainingCapacity();
maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH);
batch = db.generateBatch(contactId, maxLength, maxLatency);
}
writer.flush();
writer.close();
dispose(false);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true);
}
connRegistry.unregisterConnection(contactId, transportId);
}
private boolean writeTransportAcks(StreamWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
Collection<TransportAck> acks = db.generateTransportAcks(contactId);
if(acks == null) return true;
for(TransportAck a : acks) {
writer.writeTransportAck(a);
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH) return false;
}
return true;
}
private boolean writeTransportUpdates(StreamWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
Collection<TransportUpdate> updates =
db.generateTransportUpdates(contactId, maxLatency);
if(updates == null) return true;
for(TransportUpdate u : updates) {
writer.writeTransportUpdate(u);
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH) return false;
}
return true;
}
private boolean writeSubscriptionAck(StreamWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(a == null) return true;
writer.writeSubscriptionAck(a);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private boolean writeSubscriptionUpdate(StreamWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if(u == null) return true;
writer.writeSubscriptionUpdate(u);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private boolean writeRetentionAck(StreamWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
RetentionAck a = db.generateRetentionAck(contactId);
if(a == null) return true;
writer.writeRetentionAck(a);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private boolean writeRetentionUpdate(StreamWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
RetentionUpdate u = db.generateRetentionUpdate(contactId, maxLatency);
if(u == null) return true;
writer.writeRetentionUpdate(u);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private void dispose(boolean exception) {
ByteUtils.erase(ctx.getSecret());
try {
transport.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}

View File

@@ -1,90 +0,0 @@
package org.briarproject.messaging.simplex;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.crypto.CryptoExecutor;
import org.briarproject.api.crypto.KeyManager;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.simplex.SimplexConnectionFactory;
import org.briarproject.api.plugins.simplex.SimplexTransportReader;
import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.api.transport.StreamWriterFactory;
class SimplexConnectionFactoryImpl implements SimplexConnectionFactory {
private static final Logger LOG =
Logger.getLogger(SimplexConnectionFactoryImpl.class.getName());
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final KeyManager keyManager;
private final ConnectionRegistry connRegistry;
private final StreamReaderFactory connReaderFactory;
private final StreamWriterFactory connWriterFactory;
private final PacketReaderFactory packetReaderFactory;
private final PacketWriterFactory packetWriterFactory;
@Inject
SimplexConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
KeyManager keyManager, ConnectionRegistry connRegistry,
StreamReaderFactory connReaderFactory,
StreamWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory) {
this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.keyManager = keyManager;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory;
}
public void createIncomingConnection(StreamContext ctx,
SimplexTransportReader r) {
final IncomingSimplexConnection conn = new IncomingSimplexConnection(
dbExecutor, cryptoExecutor, messageVerifier, db, connRegistry,
connReaderFactory, packetReaderFactory, ctx, r);
Runnable read = new Runnable() {
public void run() {
conn.read();
}
};
new Thread(read, "SimplexConnectionReader").start();
}
public void createOutgoingConnection(ContactId c, TransportId t,
SimplexTransportWriter w) {
StreamContext ctx = keyManager.getStreamContext(c, t);
if(ctx == null) {
LOG.warning("Could not create outgoing connection context");
return;
}
final OutgoingSimplexConnection conn = new OutgoingSimplexConnection(db,
connRegistry, connWriterFactory, packetWriterFactory, ctx, w);
Runnable write = new Runnable() {
public void run() {
conn.write();
}
};
new Thread(write, "SimplexConnectionWriter").start();
}
}

View File

@@ -1,15 +0,0 @@
package org.briarproject.messaging.simplex;
import javax.inject.Singleton;
import org.briarproject.api.messaging.simplex.SimplexConnectionFactory;
import com.google.inject.AbstractModule;
public class SimplexMessagingModule extends AbstractModule {
protected void configure() {
bind(SimplexConnectionFactory.class).to(
SimplexConnectionFactoryImpl.class).in(Singleton.class);
}
}

View File

@@ -27,6 +27,8 @@ import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginCallback;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.plugins.TransportConnectionReader;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexPluginConfig;
@@ -36,8 +38,6 @@ import org.briarproject.api.plugins.simplex.SimplexPlugin;
import org.briarproject.api.plugins.simplex.SimplexPluginCallback;
import org.briarproject.api.plugins.simplex.SimplexPluginConfig;
import org.briarproject.api.plugins.simplex.SimplexPluginFactory;
import org.briarproject.api.plugins.simplex.SimplexTransportReader;
import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.system.Clock;
import org.briarproject.api.transport.ConnectionDispatcher;
import org.briarproject.api.ui.UiCallback;
@@ -377,11 +377,11 @@ class PluginManagerImpl implements PluginManager {
super(id);
}
public void readerCreated(SimplexTransportReader r) {
public void readerCreated(TransportConnectionReader r) {
dispatcher.dispatchIncomingConnection(id, r);
}
public void writerCreated(ContactId c, SimplexTransportWriter w) {
public void writerCreated(ContactId c, TransportConnectionWriter w) {
dispatcher.dispatchOutgoingConnection(c, id, w);
}
}

View File

@@ -20,14 +20,14 @@ class PollerImpl implements Poller {
Logger.getLogger(PollerImpl.class.getName());
private final Executor ioExecutor;
private final ConnectionRegistry connRegistry;
private final ConnectionRegistry connectionRegistry;
private final Timer timer;
@Inject
PollerImpl(@IoExecutor Executor ioExecutor, ConnectionRegistry connRegistry,
Timer timer) {
PollerImpl(@IoExecutor Executor ioExecutor,
ConnectionRegistry connectionRegistry, Timer timer) {
this.ioExecutor = ioExecutor;
this.connRegistry = connRegistry;
this.connectionRegistry = connectionRegistry;
this.timer = timer;
}
@@ -53,7 +53,7 @@ class PollerImpl implements Poller {
public void run() {
if(LOG.isLoggable(INFO))
LOG.info("Polling " + p.getClass().getSimpleName());
p.poll(connRegistry.getConnectedContacts(p.getId()));
p.poll(connectionRegistry.getConnectedContacts(p.getId()));
}
});
}

View File

@@ -14,10 +14,10 @@ import java.util.concurrent.Executor;
import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.plugins.TransportConnectionReader;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.simplex.SimplexPlugin;
import org.briarproject.api.plugins.simplex.SimplexPluginCallback;
import org.briarproject.api.plugins.simplex.SimplexTransportReader;
import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.system.FileUtils;
public abstract class FilePlugin implements SimplexPlugin {
@@ -60,11 +60,11 @@ public abstract class FilePlugin implements SimplexPlugin {
return running;
}
public SimplexTransportReader createReader(ContactId c) {
public TransportConnectionReader createReader(ContactId c) {
return null;
}
public SimplexTransportWriter createWriter(ContactId c) {
public TransportConnectionWriter createWriter(ContactId c) {
if(!running) return null;
return createWriter(createConnectionFilename());
}
@@ -81,7 +81,7 @@ public abstract class FilePlugin implements SimplexPlugin {
return filename.toLowerCase(Locale.US).matches("[a-z]{8}\\.dat");
}
private SimplexTransportWriter createWriter(String filename) {
private TransportConnectionWriter createWriter(String filename) {
if(!running) return null;
File dir = chooseOutputDirectory();
if(dir == null || !dir.exists() || !dir.isDirectory()) return null;

View File

@@ -7,9 +7,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.logging.Logger;
import org.briarproject.api.plugins.simplex.SimplexTransportReader;
import org.briarproject.api.plugins.TransportConnectionReader;
class FileTransportReader implements SimplexTransportReader {
class FileTransportReader implements TransportConnectionReader {
private static final Logger LOG =
Logger.getLogger(FileTransportReader.class.getName());
@@ -28,6 +28,10 @@ class FileTransportReader implements SimplexTransportReader {
return plugin.getMaxFrameLength();
}
public long getMaxLatency() {
return plugin.getMaxLatency();
}
public InputStream getInputStream() {
return in;
}

View File

@@ -7,9 +7,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.logging.Logger;
import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.plugins.TransportConnectionWriter;
class FileTransportWriter implements SimplexTransportWriter {
class FileTransportWriter implements TransportConnectionWriter {
private static final Logger LOG =
Logger.getLogger(FileTransportWriter.class.getName());
@@ -27,10 +27,6 @@ class FileTransportWriter implements SimplexTransportWriter {
this.plugin = plugin;
}
public long getCapacity() {
return capacity;
}
public int getMaxFrameLength() {
return plugin.getMaxFrameLength();
}
@@ -39,6 +35,10 @@ class FileTransportWriter implements SimplexTransportWriter {
return plugin.getMaxLatency();
}
public long getCapacity() {
return capacity;
}
public OutputStream getOutputStream() {
return out;
}

View File

@@ -4,38 +4,80 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicBoolean;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.TransportConnectionReader;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
class TcpTransportConnection implements DuplexTransportConnection {
private final Plugin plugin;
private final Socket socket;
private final Reader reader;
private final Writer writer;
private final AtomicBoolean halfClosed, closed;
TcpTransportConnection(Plugin plugin, Socket socket) {
this.plugin = plugin;
this.socket = socket;
reader = new Reader();
writer = new Writer();
halfClosed = new AtomicBoolean(false);
closed = new AtomicBoolean(false);
}
public int getMaxFrameLength() {
return plugin.getMaxFrameLength();
public TransportConnectionReader getReader() {
return reader;
}
public long getMaxLatency() {
return plugin.getMaxLatency();
public TransportConnectionWriter getWriter() {
return writer;
}
public InputStream getInputStream() throws IOException {
return socket.getInputStream();
private class Reader implements TransportConnectionReader {
public int getMaxFrameLength() {
return plugin.getMaxFrameLength();
}
public long getMaxLatency() {
return plugin.getMaxLatency();
}
public InputStream getInputStream() throws IOException {
return socket.getInputStream();
}
public void dispose(boolean exception, boolean recognised)
throws IOException {
if(halfClosed.getAndSet(true) || exception)
if(!closed.getAndSet(true)) socket.close();
}
}
public OutputStream getOutputStream() throws IOException {
return socket.getOutputStream();
}
private class Writer implements TransportConnectionWriter {
public void dispose(boolean exception, boolean recognised)
throws IOException {
socket.close();
public int getMaxFrameLength() {
return plugin.getMaxFrameLength();
}
public long getMaxLatency() {
return plugin.getMaxLatency();
}
public long getCapacity() {
return Long.MAX_VALUE;
}
public OutputStream getOutputStream() throws IOException {
return socket.getOutputStream();
}
public void dispose(boolean exception) throws IOException {
if(halfClosed.getAndSet(true) || exception)
if(!closed.getAndSet(true)) socket.close();
}
}
}

View File

@@ -13,14 +13,16 @@ import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.crypto.KeyManager;
import org.briarproject.api.db.DbException;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.messaging.duplex.DuplexConnectionFactory;
import org.briarproject.api.messaging.simplex.SimplexConnectionFactory;
import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.MessagingSessionFactory;
import org.briarproject.api.plugins.TransportConnectionReader;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.plugins.simplex.SimplexTransportReader;
import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.transport.ConnectionDispatcher;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.TagRecogniser;
@@ -30,132 +32,280 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
Logger.getLogger(ConnectionDispatcherImpl.class.getName());
private final Executor ioExecutor;
private final KeyManager keyManager;
private final TagRecogniser tagRecogniser;
private final SimplexConnectionFactory simplexConnFactory;
private final DuplexConnectionFactory duplexConnFactory;
private final MessagingSessionFactory messagingSessionFactory;
private final ConnectionRegistry connectionRegistry;
@Inject
ConnectionDispatcherImpl(@IoExecutor Executor ioExecutor,
TagRecogniser tagRecogniser,
SimplexConnectionFactory simplexConnFactory,
DuplexConnectionFactory duplexConnFactory) {
KeyManager keyManager, TagRecogniser tagRecogniser,
MessagingSessionFactory messagingSessionFactory,
ConnectionRegistry connectionRegistry) {
this.ioExecutor = ioExecutor;
this.keyManager = keyManager;
this.tagRecogniser = tagRecogniser;
this.simplexConnFactory = simplexConnFactory;
this.duplexConnFactory = duplexConnFactory;
this.messagingSessionFactory = messagingSessionFactory;
this.connectionRegistry = connectionRegistry;
}
public void dispatchIncomingConnection(TransportId t,
SimplexTransportReader r) {
ioExecutor.execute(new DispatchSimplexConnection(t, r));
TransportConnectionReader r) {
ioExecutor.execute(new DispatchIncomingSimplexConnection(t, r));
}
public void dispatchIncomingConnection(TransportId t,
DuplexTransportConnection d) {
ioExecutor.execute(new DispatchDuplexConnection(t, d));
ioExecutor.execute(new DispatchIncomingDuplexConnection(t, d));
}
public void dispatchOutgoingConnection(ContactId c, TransportId t,
SimplexTransportWriter w) {
simplexConnFactory.createOutgoingConnection(c, t, w);
TransportConnectionWriter w) {
ioExecutor.execute(new DispatchOutgoingSimplexConnection(c, t, w));
}
public void dispatchOutgoingConnection(ContactId c, TransportId t,
DuplexTransportConnection d) {
duplexConnFactory.createOutgoingConnection(c, t, d);
ioExecutor.execute(new DispatchOutgoingDuplexConnection(c, t, d));
}
private byte[] readTag(InputStream in) throws IOException {
byte[] b = new byte[TAG_LENGTH];
int offset = 0;
while(offset < b.length) {
int read = in.read(b, offset, b.length - offset);
if(read == -1) throw new EOFException();
offset += read;
private StreamContext readAndRecogniseTag(TransportId t,
TransportConnectionReader r) {
// Read the tag
byte[] tag = new byte[TAG_LENGTH];
try {
InputStream in = r.getInputStream();
int offset = 0;
while(offset < tag.length) {
int read = in.read(tag, offset, tag.length - offset);
if(read == -1) throw new EOFException();
offset += read;
}
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(r, true, false);
return null;
}
return b;
// Recognise the tag
StreamContext ctx = null;
try {
ctx = tagRecogniser.recogniseTag(t, tag);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(r, true, false);
return null;
}
if(ctx == null) dispose(r, false, false);
return ctx;
}
private class DispatchSimplexConnection implements Runnable {
private void runAndDispose(StreamContext ctx, TransportConnectionReader r) {
MessagingSession in =
messagingSessionFactory.createIncomingSession(ctx, r);
ContactId contactId = ctx.getContactId();
TransportId transportId = ctx.getTransportId();
connectionRegistry.registerConnection(contactId, transportId);
try {
in.run();
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(r, true, true);
return;
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
dispose(r, false, true);
}
private void dispose(TransportConnectionReader r, boolean exception,
boolean recognised) {
try {
r.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void runAndDispose(StreamContext ctx, TransportConnectionWriter w,
boolean duplex) {
MessagingSession out =
messagingSessionFactory.createOutgoingSession(ctx, w, duplex);
ContactId contactId = ctx.getContactId();
TransportId transportId = ctx.getTransportId();
connectionRegistry.registerConnection(contactId, transportId);
try {
out.run();
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(w, true);
return;
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
dispose(w, false);
}
private void dispose(TransportConnectionWriter w, boolean exception) {
try {
w.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private class DispatchIncomingSimplexConnection implements Runnable {
private final TransportId transportId;
private final SimplexTransportReader transport;
private final TransportConnectionReader reader;
private DispatchSimplexConnection(TransportId transportId,
SimplexTransportReader transport) {
private DispatchIncomingSimplexConnection(TransportId transportId,
TransportConnectionReader reader) {
this.transportId = transportId;
this.transport = transport;
this.reader = reader;
}
public void run() {
try {
byte[] tag = readTag(transport.getInputStream());
StreamContext ctx = tagRecogniser.recogniseTag(transportId,
tag);
if(ctx == null) {
transport.dispose(false, false);
} else {
simplexConnFactory.createIncomingConnection(ctx,
transport);
}
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
try {
transport.dispose(true, false);
} catch(IOException e1) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e1.toString(), e1);
}
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
try {
transport.dispose(true, false);
} catch(IOException e1) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e1.toString(), e1);
}
}
// Read and recognise the tag
StreamContext ctx = readAndRecogniseTag(transportId, reader);
if(ctx == null) return;
// Run the incoming session
runAndDispose(ctx, reader);
}
}
private class DispatchDuplexConnection implements Runnable {
private class DispatchOutgoingSimplexConnection implements Runnable {
private final ContactId contactId;
private final TransportId transportId;
private final TransportConnectionWriter writer;
private DispatchOutgoingSimplexConnection(ContactId contactId,
TransportId transportId, TransportConnectionWriter writer) {
this.contactId = contactId;
this.transportId = transportId;
this.writer = writer;
}
public void run() {
// Allocate a stream context
StreamContext ctx = keyManager.getStreamContext(contactId,
transportId);
if(ctx == null) {
dispose(writer, false);
return;
}
// Run the outgoing session
runAndDispose(ctx, writer, false);
}
}
private class DispatchIncomingDuplexConnection implements Runnable {
private final TransportId transportId;
private final DuplexTransportConnection transport;
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private DispatchDuplexConnection(TransportId transportId,
private DispatchIncomingDuplexConnection(TransportId transportId,
DuplexTransportConnection transport) {
this.transportId = transportId;
this.transport = transport;
reader = transport.getReader();
writer = transport.getWriter();
}
public void run() {
byte[] tag;
try {
tag = readTag(transport.getInputStream());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, false);
return;
}
StreamContext ctx = null;
try {
ctx = tagRecogniser.recogniseTag(transportId, tag);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, false);
return;
}
if(ctx == null) dispose(false, false);
else duplexConnFactory.createIncomingConnection(ctx, transport);
// Read and recognise the tag
StreamContext ctx = readAndRecogniseTag(transportId, reader);
if(ctx == null) return;
// Start the outgoing session on another thread
ioExecutor.execute(new DispatchIncomingDuplexConnectionSide2(
ctx.getContactId(), transportId, writer));
// Run the incoming session
runAndDispose(ctx, reader);
}
}
private class DispatchIncomingDuplexConnectionSide2 implements Runnable {
private final ContactId contactId;
private final TransportId transportId;
private final TransportConnectionWriter writer;
private DispatchIncomingDuplexConnectionSide2(ContactId contactId,
TransportId transportId, TransportConnectionWriter writer) {
this.contactId = contactId;
this.transportId = transportId;
this.writer = writer;
}
private void dispose(boolean exception, boolean recognised) {
try {
transport.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
public void run() {
// Allocate a stream context
StreamContext ctx = keyManager.getStreamContext(contactId,
transportId);
if(ctx == null) {
dispose(writer, false);
return;
}
// Run the outgoing session
runAndDispose(ctx, writer, true);
}
}
private class DispatchOutgoingDuplexConnection implements Runnable {
private final ContactId contactId;
private final TransportId transportId;
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private DispatchOutgoingDuplexConnection(ContactId contactId,
TransportId transportId, DuplexTransportConnection transport) {
this.contactId = contactId;
this.transportId = transportId;
reader = transport.getReader();
writer = transport.getWriter();
}
public void run() {
// Allocate a stream context
StreamContext ctx = keyManager.getStreamContext(contactId,
transportId);
if(ctx == null) {
dispose(writer, false);
return;
}
// Start the incoming session on another thread
ioExecutor.execute(new DispatchOutgoingDuplexConnectionSide2(
contactId, transportId, reader));
// Run the outgoing session
runAndDispose(ctx, writer, true);
}
}
private class DispatchOutgoingDuplexConnectionSide2 implements Runnable {
private final ContactId contactId;
private final TransportId transportId;
private final TransportConnectionReader reader;
private DispatchOutgoingDuplexConnectionSide2(ContactId contactId,
TransportId transportId, TransportConnectionReader reader) {
this.contactId = contactId;
this.transportId = transportId;
this.reader = reader;
}
public void run() {
// Read and recognise the tag
StreamContext ctx = readAndRecogniseTag(transportId, reader);
if(ctx == null) return;
// Check that the stream comes from the expected contact
if(!ctx.getContactId().equals(contactId)) {
LOG.warning("Wrong contact ID for duplex connection");
dispose(reader, true, true);
return;
}
// Run the incoming session
runAndDispose(ctx, reader);
}
}
}

View File

@@ -8,9 +8,6 @@ interface FrameWriter {
void writeFrame(byte[] frame, int payloadLength, boolean finalFrame)
throws IOException;
/** Flushes the stack. */
/** Flushes the stream. */
void flush() throws IOException;
/** Returns the maximum number of bytes that can be written. */
long getRemainingCapacity();
}

View File

@@ -19,22 +19,18 @@ class OutgoingEncryptionLayer implements FrameWriter {
private final AuthenticatedCipher frameCipher;
private final SecretKey frameKey;
private final byte[] tag, iv, aad, ciphertext;
private final int frameLength, maxPayloadLength;
private final int frameLength;
private long capacity, frameNumber;
private long frameNumber;
private boolean writeTag;
/** Constructor for the initiator's side of a connection. */
OutgoingEncryptionLayer(OutputStream out, long capacity,
AuthenticatedCipher frameCipher, SecretKey frameKey,
int frameLength, byte[] tag) {
OutgoingEncryptionLayer(OutputStream out, AuthenticatedCipher frameCipher,
SecretKey frameKey, int frameLength, byte[] tag) {
this.out = out;
this.capacity = capacity;
this.frameCipher = frameCipher;
this.frameKey = frameKey;
this.frameLength = frameLength;
this.tag = tag;
maxPayloadLength = frameLength - HEADER_LENGTH - MAC_LENGTH;
iv = new byte[IV_LENGTH];
aad = new byte[AAD_LENGTH];
ciphertext = new byte[frameLength];
@@ -42,24 +38,6 @@ class OutgoingEncryptionLayer implements FrameWriter {
writeTag = true;
}
/** Constructor for the responder's side of a connection. */
OutgoingEncryptionLayer(OutputStream out, long capacity,
AuthenticatedCipher frameCipher, SecretKey frameKey,
int frameLength) {
this.out = out;
this.capacity = capacity;
this.frameCipher = frameCipher;
this.frameKey = frameKey;
this.frameLength = frameLength;
tag = null;
maxPayloadLength = frameLength - HEADER_LENGTH - MAC_LENGTH;
iv = new byte[IV_LENGTH];
aad = new byte[AAD_LENGTH];
ciphertext = new byte[frameLength];
frameNumber = 0;
writeTag = false;
}
public void writeFrame(byte[] frame, int payloadLength, boolean finalFrame)
throws IOException {
if(frameNumber > MAX_32_BIT_UNSIGNED) throw new IllegalStateException();
@@ -71,7 +49,6 @@ class OutgoingEncryptionLayer implements FrameWriter {
frameKey.erase();
throw e;
}
capacity -= tag.length;
writeTag = false;
}
// Encode the header
@@ -107,7 +84,6 @@ class OutgoingEncryptionLayer implements FrameWriter {
frameKey.erase();
throw e;
}
capacity -= ciphertextLength;
frameNumber++;
}
@@ -120,33 +96,8 @@ class OutgoingEncryptionLayer implements FrameWriter {
frameKey.erase();
throw e;
}
capacity -= tag.length;
writeTag = false;
}
out.flush();
}
public long getRemainingCapacity() {
// How many frame numbers can we use?
long frameNumbers = MAX_32_BIT_UNSIGNED - frameNumber + 1;
// How many full frames do we have space for?
long bytes = writeTag ? capacity - tag.length : capacity;
long fullFrames = bytes / frameLength;
// Are we limited by frame numbers or space?
if(frameNumbers > fullFrames) {
// Can we send a partial frame after the full frames?
int partialFrame = (int) (bytes - fullFrames * frameLength);
if(partialFrame > HEADER_LENGTH + MAC_LENGTH) {
// Send full frames and a partial frame, limited by space
int partialPayload = partialFrame - HEADER_LENGTH - MAC_LENGTH;
return maxPayloadLength * fullFrames + partialPayload;
} else {
// Send full frames only, limited by space
return maxPayloadLength * fullFrames;
}
} else {
// Send full frames only, limited by frame numbers
return maxPayloadLength * frameNumbers;
}
}
}

View File

@@ -20,24 +20,21 @@ class StreamReaderFactoryImpl implements StreamReaderFactory {
}
public StreamReader createStreamReader(InputStream in,
int maxFrameLength, StreamContext ctx, boolean incoming,
boolean initiator) {
int maxFrameLength, StreamContext ctx) {
byte[] secret = ctx.getSecret();
long streamNumber = ctx.getStreamNumber();
boolean weAreAlice = ctx.getAlice();
boolean initiatorIsAlice = incoming ? !weAreAlice : weAreAlice;
SecretKey frameKey = crypto.deriveFrameKey(secret, streamNumber,
initiatorIsAlice, initiator);
FrameReader encryption = new IncomingEncryptionLayer(in,
boolean alice = !ctx.getAlice();
SecretKey frameKey = crypto.deriveFrameKey(secret, streamNumber, alice);
FrameReader frameReader = new IncomingEncryptionLayer(in,
crypto.getFrameCipher(), frameKey, maxFrameLength);
return new StreamReaderImpl(encryption, maxFrameLength);
return new StreamReaderImpl(frameReader, maxFrameLength);
}
public StreamReader createInvitationStreamReader(InputStream in,
int maxFrameLength, byte[] secret, boolean alice) {
SecretKey frameKey = crypto.deriveFrameKey(secret, 0, true, alice);
FrameReader encryption = new IncomingEncryptionLayer(in,
SecretKey frameKey = crypto.deriveFrameKey(secret, 0, alice);
FrameReader frameReader = new IncomingEncryptionLayer(in,
crypto.getFrameCipher(), frameKey, maxFrameLength);
return new StreamReaderImpl(encryption, maxFrameLength);
return new StreamReaderImpl(frameReader, maxFrameLength);
}
}

View File

@@ -22,35 +22,29 @@ class StreamWriterFactoryImpl implements StreamWriterFactory {
}
public StreamWriter createStreamWriter(OutputStream out,
int maxFrameLength, long capacity, StreamContext ctx,
boolean incoming, boolean initiator) {
int maxFrameLength, StreamContext ctx) {
byte[] secret = ctx.getSecret();
long streamNumber = ctx.getStreamNumber();
boolean weAreAlice = ctx.getAlice();
boolean initiatorIsAlice = incoming ? !weAreAlice : weAreAlice;
SecretKey frameKey = crypto.deriveFrameKey(secret, streamNumber,
initiatorIsAlice, initiator);
FrameWriter encryption;
if(initiator) {
byte[] tag = new byte[TAG_LENGTH];
SecretKey tagKey = crypto.deriveTagKey(secret, initiatorIsAlice);
crypto.encodeTag(tag, tagKey, streamNumber);
tagKey.erase();
encryption = new OutgoingEncryptionLayer(out, capacity,
crypto.getFrameCipher(), frameKey, maxFrameLength, tag);
} else {
encryption = new OutgoingEncryptionLayer(out, capacity,
crypto.getFrameCipher(), frameKey, maxFrameLength);
}
return new StreamWriterImpl(encryption, maxFrameLength);
boolean alice = ctx.getAlice();
byte[] tag = new byte[TAG_LENGTH];
SecretKey tagKey = crypto.deriveTagKey(secret, alice);
crypto.encodeTag(tag, tagKey, streamNumber);
tagKey.erase();
SecretKey frameKey = crypto.deriveFrameKey(secret, streamNumber, alice);
FrameWriter frameWriter = new OutgoingEncryptionLayer(out,
crypto.getFrameCipher(), frameKey, maxFrameLength, tag);
return new StreamWriterImpl(frameWriter, maxFrameLength);
}
public StreamWriter createInvitationStreamWriter(OutputStream out,
int maxFrameLength, byte[] secret, boolean alice) {
SecretKey frameKey = crypto.deriveFrameKey(secret, 0, true, alice);
FrameWriter encryption = new OutgoingEncryptionLayer(out,
Long.MAX_VALUE, crypto.getFrameCipher(), frameKey,
maxFrameLength);
return new StreamWriterImpl(encryption, maxFrameLength);
byte[] tag = new byte[TAG_LENGTH];
SecretKey tagKey = crypto.deriveTagKey(secret, alice);
crypto.encodeTag(tag, tagKey, 0);
tagKey.erase();
SecretKey frameKey = crypto.deriveFrameKey(secret, 0, alice);
FrameWriter frameWriter = new OutgoingEncryptionLayer(out,
crypto.getFrameCipher(), frameKey, maxFrameLength, tag);
return new StreamWriterImpl(frameWriter, maxFrameLength);
}
}

View File

@@ -33,10 +33,6 @@ class StreamWriterImpl extends OutputStream implements StreamWriter {
return this;
}
public long getRemainingCapacity() {
return out.getRemainingCapacity();
}
@Override
public void close() throws IOException {
writeFrame(true);