mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-18 13:49:53 +01:00
Factored out StreamReader/Writer from messaging layer.
This commit is contained in:
@@ -25,10 +25,6 @@ import org.briarproject.api.messaging.SubscriptionUpdate;
|
||||
import org.briarproject.api.messaging.TransportAck;
|
||||
import org.briarproject.api.messaging.TransportUpdate;
|
||||
import org.briarproject.api.messaging.UnverifiedMessage;
|
||||
import org.briarproject.api.plugins.TransportConnectionReader;
|
||||
import org.briarproject.api.transport.StreamContext;
|
||||
import org.briarproject.api.transport.StreamReader;
|
||||
import org.briarproject.api.transport.StreamReaderFactory;
|
||||
|
||||
/**
|
||||
* An incoming {@link org.briarproject.api.messaging.MessagingSession
|
||||
@@ -42,36 +38,26 @@ class IncomingSession implements MessagingSession {
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor, cryptoExecutor;
|
||||
private final MessageVerifier messageVerifier;
|
||||
private final StreamReaderFactory streamReaderFactory;
|
||||
private final PacketReaderFactory packetReaderFactory;
|
||||
private final StreamContext ctx;
|
||||
private final TransportConnectionReader transportReader;
|
||||
private final ContactId contactId;
|
||||
private final InputStream in;
|
||||
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
IncomingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
Executor cryptoExecutor, MessageVerifier messageVerifier,
|
||||
StreamReaderFactory streamReaderFactory,
|
||||
PacketReaderFactory packetReaderFactory, StreamContext ctx,
|
||||
TransportConnectionReader transportReader) {
|
||||
PacketReaderFactory packetReaderFactory, ContactId contactId,
|
||||
InputStream in) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.cryptoExecutor = cryptoExecutor;
|
||||
this.messageVerifier = messageVerifier;
|
||||
this.streamReaderFactory = streamReaderFactory;
|
||||
this.packetReaderFactory = packetReaderFactory;
|
||||
this.ctx = ctx;
|
||||
this.transportReader = transportReader;
|
||||
contactId = ctx.getContactId();
|
||||
this.contactId = contactId;
|
||||
this.in = in;
|
||||
}
|
||||
|
||||
public void run() throws IOException {
|
||||
InputStream in = transportReader.getInputStream();
|
||||
int maxFrameLength = transportReader.getMaxFrameLength();
|
||||
StreamReader streamReader = streamReaderFactory.createStreamReader(in,
|
||||
maxFrameLength, ctx);
|
||||
in = streamReader.getInputStream();
|
||||
PacketReader packetReader = packetReaderFactory.createPacketReader(in);
|
||||
// Read packets until interrupted or EOF
|
||||
while(!interrupted && !packetReader.eof()) {
|
||||
|
||||
@@ -1,23 +1,22 @@
|
||||
package org.briarproject.messaging;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.briarproject.api.ContactId;
|
||||
import org.briarproject.api.TransportId;
|
||||
import org.briarproject.api.crypto.CryptoExecutor;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.db.DatabaseExecutor;
|
||||
import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.messaging.MessageVerifier;
|
||||
import org.briarproject.api.messaging.MessagingSession;
|
||||
import org.briarproject.api.messaging.MessagingSessionFactory;
|
||||
import org.briarproject.api.messaging.PacketReaderFactory;
|
||||
import org.briarproject.api.messaging.PacketWriterFactory;
|
||||
import org.briarproject.api.messaging.MessagingSessionFactory;
|
||||
import org.briarproject.api.plugins.TransportConnectionReader;
|
||||
import org.briarproject.api.plugins.TransportConnectionWriter;
|
||||
import org.briarproject.api.transport.StreamContext;
|
||||
import org.briarproject.api.transport.StreamReaderFactory;
|
||||
import org.briarproject.api.transport.StreamWriterFactory;
|
||||
|
||||
class MessagingSessionFactoryImpl implements MessagingSessionFactory {
|
||||
|
||||
@@ -25,8 +24,6 @@ class MessagingSessionFactoryImpl implements MessagingSessionFactory {
|
||||
private final Executor dbExecutor, cryptoExecutor;
|
||||
private final MessageVerifier messageVerifier;
|
||||
private final EventBus eventBus;
|
||||
private final StreamReaderFactory streamReaderFactory;
|
||||
private final StreamWriterFactory streamWriterFactory;
|
||||
private final PacketReaderFactory packetReaderFactory;
|
||||
private final PacketWriterFactory packetWriterFactory;
|
||||
|
||||
@@ -35,8 +32,6 @@ class MessagingSessionFactoryImpl implements MessagingSessionFactory {
|
||||
@DatabaseExecutor Executor dbExecutor,
|
||||
@CryptoExecutor Executor cryptoExecutor,
|
||||
MessageVerifier messageVerifier, EventBus eventBus,
|
||||
StreamReaderFactory streamReaderFactory,
|
||||
StreamWriterFactory streamWriterFactory,
|
||||
PacketReaderFactory packetReaderFactory,
|
||||
PacketWriterFactory packetWriterFactory) {
|
||||
this.db = db;
|
||||
@@ -44,24 +39,20 @@ class MessagingSessionFactoryImpl implements MessagingSessionFactory {
|
||||
this.cryptoExecutor = cryptoExecutor;
|
||||
this.messageVerifier = messageVerifier;
|
||||
this.eventBus = eventBus;
|
||||
this.streamReaderFactory = streamReaderFactory;
|
||||
this.streamWriterFactory = streamWriterFactory;
|
||||
this.packetReaderFactory = packetReaderFactory;
|
||||
this.packetWriterFactory = packetWriterFactory;
|
||||
}
|
||||
|
||||
public MessagingSession createIncomingSession(StreamContext ctx,
|
||||
TransportConnectionReader r) {
|
||||
public MessagingSession createIncomingSession(ContactId c, InputStream in) {
|
||||
return new IncomingSession(db, dbExecutor, cryptoExecutor,
|
||||
messageVerifier, streamReaderFactory, packetReaderFactory,
|
||||
ctx, r);
|
||||
messageVerifier, packetReaderFactory, c, in);
|
||||
}
|
||||
|
||||
public MessagingSession createOutgoingSession(StreamContext ctx,
|
||||
TransportConnectionWriter w, boolean duplex) {
|
||||
public MessagingSession createOutgoingSession(ContactId c, TransportId t,
|
||||
long maxLatency, OutputStream out, boolean duplex) {
|
||||
if(duplex) return new ReactiveOutgoingSession(db, dbExecutor, eventBus,
|
||||
streamWriterFactory, packetWriterFactory, ctx, w);
|
||||
packetWriterFactory, c, t, maxLatency, out);
|
||||
else return new SinglePassOutgoingSession(db, dbExecutor,
|
||||
streamWriterFactory, packetWriterFactory, ctx, w);
|
||||
packetWriterFactory, c, maxLatency, out);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import org.briarproject.api.ContactId;
|
||||
import org.briarproject.api.TransportId;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.db.DbException;
|
||||
import org.briarproject.api.event.ContactRemovedEvent;
|
||||
@@ -42,10 +43,6 @@ import org.briarproject.api.messaging.SubscriptionAck;
|
||||
import org.briarproject.api.messaging.SubscriptionUpdate;
|
||||
import org.briarproject.api.messaging.TransportAck;
|
||||
import org.briarproject.api.messaging.TransportUpdate;
|
||||
import org.briarproject.api.plugins.TransportConnectionWriter;
|
||||
import org.briarproject.api.transport.StreamContext;
|
||||
import org.briarproject.api.transport.StreamWriter;
|
||||
import org.briarproject.api.transport.StreamWriterFactory;
|
||||
|
||||
/**
|
||||
* An outgoing {@link org.briarproject.api.messaging.MessagingSession
|
||||
@@ -65,41 +62,34 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor;
|
||||
private final EventBus eventBus;
|
||||
private final StreamWriterFactory streamWriterFactory;
|
||||
private final PacketWriterFactory packetWriterFactory;
|
||||
private final StreamContext ctx;
|
||||
private final TransportConnectionWriter transportWriter;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final long maxLatency;
|
||||
private final OutputStream out;
|
||||
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
|
||||
|
||||
private volatile PacketWriter packetWriter = null;
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
ReactiveOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, StreamWriterFactory streamWriterFactory,
|
||||
PacketWriterFactory packetWriterFactory, StreamContext ctx,
|
||||
TransportConnectionWriter transportWriter) {
|
||||
EventBus eventBus, PacketWriterFactory packetWriterFactory,
|
||||
ContactId contactId, TransportId transportId, long maxLatency,
|
||||
OutputStream out) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.streamWriterFactory = streamWriterFactory;
|
||||
this.packetWriterFactory = packetWriterFactory;
|
||||
this.ctx = ctx;
|
||||
this.transportWriter = transportWriter;
|
||||
contactId = ctx.getContactId();
|
||||
maxLatency = transportWriter.getMaxLatency();
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.out = out;
|
||||
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
|
||||
}
|
||||
|
||||
public void run() throws IOException {
|
||||
eventBus.addListener(this);
|
||||
try {
|
||||
OutputStream out = transportWriter.getOutputStream();
|
||||
int maxFrameLength = transportWriter.getMaxFrameLength();
|
||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
||||
out, maxFrameLength, ctx);
|
||||
out = streamWriter.getOutputStream();
|
||||
packetWriter = packetWriterFactory.createPacketWriter(out);
|
||||
// Start a query for each type of packet, in order of urgency
|
||||
dbExecutor.execute(new GenerateTransportAcks());
|
||||
@@ -183,7 +173,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
|
||||
dbExecutor.execute(new GenerateTransportAcks());
|
||||
} else if(e instanceof TransportRemovedEvent) {
|
||||
TransportRemovedEvent t = (TransportRemovedEvent) e;
|
||||
if(ctx.getTransportId().equals(t.getTransportId())) {
|
||||
if(t.getTransportId().equals(transportId)) {
|
||||
LOG.info("Transport removed, closing");
|
||||
interrupt();
|
||||
}
|
||||
|
||||
@@ -26,10 +26,6 @@ import org.briarproject.api.messaging.SubscriptionAck;
|
||||
import org.briarproject.api.messaging.SubscriptionUpdate;
|
||||
import org.briarproject.api.messaging.TransportAck;
|
||||
import org.briarproject.api.messaging.TransportUpdate;
|
||||
import org.briarproject.api.plugins.TransportConnectionWriter;
|
||||
import org.briarproject.api.transport.StreamContext;
|
||||
import org.briarproject.api.transport.StreamWriter;
|
||||
import org.briarproject.api.transport.StreamWriterFactory;
|
||||
|
||||
/**
|
||||
* An outgoing {@link org.briarproject.api.messaging.MessagingSession
|
||||
@@ -48,41 +44,30 @@ class SinglePassOutgoingSession implements MessagingSession {
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor;
|
||||
private final StreamWriterFactory streamWriterFactory;
|
||||
private final PacketWriterFactory packetWriterFactory;
|
||||
private final StreamContext ctx;
|
||||
private final TransportConnectionWriter transportWriter;
|
||||
private final ContactId contactId;
|
||||
private final long maxLatency;
|
||||
private final OutputStream out;
|
||||
private final AtomicInteger outstandingQueries;
|
||||
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
|
||||
|
||||
private volatile StreamWriter streamWriter = null;
|
||||
private volatile PacketWriter packetWriter = null;
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
SinglePassOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
StreamWriterFactory streamWriterFactory,
|
||||
PacketWriterFactory packetWriterFactory, StreamContext ctx,
|
||||
TransportConnectionWriter transportWriter) {
|
||||
PacketWriterFactory packetWriterFactory, ContactId contactId,
|
||||
long maxLatency, OutputStream out) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.streamWriterFactory = streamWriterFactory;
|
||||
this.packetWriterFactory = packetWriterFactory;
|
||||
this.ctx = ctx;
|
||||
this.transportWriter = transportWriter;
|
||||
contactId = ctx.getContactId();
|
||||
maxLatency = transportWriter.getMaxLatency();
|
||||
this.contactId = contactId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.out = out;
|
||||
outstandingQueries = new AtomicInteger(8); // One per type of packet
|
||||
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
|
||||
}
|
||||
|
||||
public void run() throws IOException {
|
||||
OutputStream out = transportWriter.getOutputStream();
|
||||
int maxFrameLength = transportWriter.getMaxFrameLength();
|
||||
streamWriter = streamWriterFactory.createStreamWriter(out,
|
||||
maxFrameLength, ctx);
|
||||
out = streamWriter.getOutputStream();
|
||||
packetWriter = packetWriterFactory.createPacketWriter(out);
|
||||
// Start a query for each type of packet, in order of urgency
|
||||
dbExecutor.execute(new GenerateTransportAcks());
|
||||
|
||||
@@ -6,6 +6,7 @@ import static org.briarproject.api.transport.TransportConstants.TAG_LENGTH;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
@@ -24,6 +25,10 @@ import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
|
||||
import org.briarproject.api.transport.ConnectionDispatcher;
|
||||
import org.briarproject.api.transport.ConnectionRegistry;
|
||||
import org.briarproject.api.transport.StreamContext;
|
||||
import org.briarproject.api.transport.StreamReader;
|
||||
import org.briarproject.api.transport.StreamReaderFactory;
|
||||
import org.briarproject.api.transport.StreamWriter;
|
||||
import org.briarproject.api.transport.StreamWriterFactory;
|
||||
import org.briarproject.api.transport.TagRecogniser;
|
||||
|
||||
class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
@@ -34,17 +39,23 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
private final Executor ioExecutor;
|
||||
private final KeyManager keyManager;
|
||||
private final TagRecogniser tagRecogniser;
|
||||
private final StreamReaderFactory streamReaderFactory;
|
||||
private final StreamWriterFactory streamWriterFactory;
|
||||
private final MessagingSessionFactory messagingSessionFactory;
|
||||
private final ConnectionRegistry connectionRegistry;
|
||||
|
||||
@Inject
|
||||
ConnectionDispatcherImpl(@IoExecutor Executor ioExecutor,
|
||||
KeyManager keyManager, TagRecogniser tagRecogniser,
|
||||
StreamReaderFactory streamReaderFactory,
|
||||
StreamWriterFactory streamWriterFactory,
|
||||
MessagingSessionFactory messagingSessionFactory,
|
||||
ConnectionRegistry connectionRegistry) {
|
||||
this.ioExecutor = ioExecutor;
|
||||
this.keyManager = keyManager;
|
||||
this.tagRecogniser = tagRecogniser;
|
||||
this.streamReaderFactory = streamReaderFactory;
|
||||
this.streamWriterFactory = streamWriterFactory;
|
||||
this.messagingSessionFactory = messagingSessionFactory;
|
||||
this.connectionRegistry = connectionRegistry;
|
||||
}
|
||||
@@ -83,6 +94,25 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
return tag;
|
||||
}
|
||||
|
||||
private MessagingSession createIncomingSession(StreamContext ctx,
|
||||
TransportConnectionReader r) throws IOException {
|
||||
InputStream in = r.getInputStream();
|
||||
StreamReader streamReader = streamReaderFactory.createStreamReader(in,
|
||||
r.getMaxFrameLength(), ctx);
|
||||
return messagingSessionFactory.createIncomingSession(ctx.getContactId(),
|
||||
streamReader.getInputStream());
|
||||
}
|
||||
|
||||
private MessagingSession createOutgoingSession(StreamContext ctx,
|
||||
TransportConnectionWriter w, boolean duplex) throws IOException {
|
||||
OutputStream out = w.getOutputStream();
|
||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out,
|
||||
w.getMaxFrameLength(), ctx);
|
||||
return messagingSessionFactory.createOutgoingSession(ctx.getContactId(),
|
||||
ctx.getTransportId(), w.getMaxLatency(),
|
||||
streamWriter.getOutputStream(), duplex);
|
||||
}
|
||||
|
||||
private class DispatchIncomingSimplexConnection implements Runnable {
|
||||
|
||||
private final TransportId transportId;
|
||||
@@ -116,11 +146,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
}
|
||||
ContactId contactId = ctx.getContactId();
|
||||
connectionRegistry.registerConnection(contactId, transportId);
|
||||
// Run the incoming session
|
||||
MessagingSession incomingSession =
|
||||
messagingSessionFactory.createIncomingSession(ctx, reader);
|
||||
try {
|
||||
incomingSession.run();
|
||||
// Create and run the incoming session
|
||||
createIncomingSession(ctx, reader).run();
|
||||
disposeReader(false, true);
|
||||
} catch(IOException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
@@ -162,12 +190,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
return;
|
||||
}
|
||||
connectionRegistry.registerConnection(contactId, transportId);
|
||||
// Run the outgoing session
|
||||
MessagingSession outgoingSession =
|
||||
messagingSessionFactory.createOutgoingSession(ctx,
|
||||
writer, false);
|
||||
try {
|
||||
outgoingSession.run();
|
||||
// Create and run the outgoing session
|
||||
createOutgoingSession(ctx, writer, false).run();
|
||||
disposeWriter(false);
|
||||
} catch(IOException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
@@ -231,10 +256,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
runOutgoingSession();
|
||||
}
|
||||
});
|
||||
// Run the incoming session
|
||||
incomingSession = messagingSessionFactory.createIncomingSession(ctx,
|
||||
reader);
|
||||
try {
|
||||
// Create and run the incoming session
|
||||
incomingSession = createIncomingSession(ctx, reader);
|
||||
incomingSession.run();
|
||||
disposeReader(false, true);
|
||||
} catch(IOException e) {
|
||||
@@ -254,10 +278,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
disposeWriter(true);
|
||||
return;
|
||||
}
|
||||
// Run the outgoing session
|
||||
outgoingSession = messagingSessionFactory.createOutgoingSession(ctx,
|
||||
writer, true);
|
||||
try {
|
||||
// Create and run the outgoing session
|
||||
outgoingSession = createOutgoingSession(ctx, writer, true);
|
||||
outgoingSession.run();
|
||||
disposeWriter(false);
|
||||
} catch(IOException e) {
|
||||
@@ -321,10 +344,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
runIncomingSession();
|
||||
}
|
||||
});
|
||||
// Run the outgoing session
|
||||
outgoingSession = messagingSessionFactory.createOutgoingSession(ctx,
|
||||
writer, true);
|
||||
try {
|
||||
// Create and run the outgoing session
|
||||
outgoingSession = createOutgoingSession(ctx, writer, true);
|
||||
outgoingSession.run();
|
||||
disposeWriter(false);
|
||||
} catch(IOException e) {
|
||||
@@ -362,10 +384,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
disposeReader(true, true);
|
||||
return;
|
||||
}
|
||||
// Run the incoming session
|
||||
incomingSession = messagingSessionFactory.createIncomingSession(ctx,
|
||||
reader);
|
||||
try {
|
||||
// Create and run the incoming session
|
||||
incomingSession = createIncomingSession(ctx, reader);
|
||||
incomingSession.run();
|
||||
disposeReader(false, true);
|
||||
} catch(IOException e) {
|
||||
|
||||
Reference in New Issue
Block a user