mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-15 20:29:52 +01:00
Interrupt messaging session if contact or transport is removed.
This commit is contained in:
@@ -46,13 +46,15 @@ import org.briarproject.api.messaging.TransportUpdate;
|
||||
|
||||
/**
|
||||
* An outgoing {@link org.briarproject.api.messaging.MessagingSession
|
||||
* MessagingSession} that keeps its output stream open and reacts to events
|
||||
* that make packets available to send.
|
||||
* MessagingSession} suitable for duplex transports. The session offers
|
||||
* messages before sending them, keeps its output stream open when there are no
|
||||
* more packets to send, and reacts to events that make packets available to
|
||||
* send.
|
||||
*/
|
||||
class ReactiveOutgoingSession implements MessagingSession, EventListener {
|
||||
class DuplexOutgoingSession implements MessagingSession, EventListener {
|
||||
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(ReactiveOutgoingSession.class.getName());
|
||||
Logger.getLogger(DuplexOutgoingSession.class.getName());
|
||||
|
||||
private static final ThrowingRunnable<IOException> CLOSE =
|
||||
new ThrowingRunnable<IOException>() {
|
||||
@@ -62,35 +64,33 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor;
|
||||
private final EventBus eventBus;
|
||||
private final PacketWriterFactory packetWriterFactory;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final long maxLatency;
|
||||
private final OutputStream out;
|
||||
private final PacketWriter packetWriter;
|
||||
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
|
||||
|
||||
private volatile PacketWriter packetWriter = null;
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
ReactiveOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, PacketWriterFactory packetWriterFactory,
|
||||
ContactId contactId, TransportId transportId, long maxLatency,
|
||||
OutputStream out) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.packetWriterFactory = packetWriterFactory;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.out = out;
|
||||
packetWriter = packetWriterFactory.createPacketWriter(out);
|
||||
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
|
||||
}
|
||||
|
||||
public void run() throws IOException {
|
||||
eventBus.addListener(this);
|
||||
try {
|
||||
packetWriter = packetWriterFactory.createPacketWriter(out);
|
||||
// Start a query for each type of packet, in order of urgency
|
||||
dbExecutor.execute(new GenerateTransportAcks());
|
||||
dbExecutor.execute(new GenerateTransportUpdates());
|
||||
@@ -110,6 +110,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
|
||||
task.run();
|
||||
if(writerTasks.isEmpty()) out.flush();
|
||||
}
|
||||
out.flush();
|
||||
out.close();
|
||||
} catch(InterruptedException e) {
|
||||
LOG.info("Interrupted while waiting for a packet to write");
|
||||
@@ -128,10 +129,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
|
||||
public void eventOccurred(Event e) {
|
||||
if(e instanceof ContactRemovedEvent) {
|
||||
ContactRemovedEvent c = (ContactRemovedEvent) e;
|
||||
if(contactId.equals(c.getContactId())) {
|
||||
LOG.info("Contact removed, closing");
|
||||
interrupt();
|
||||
}
|
||||
if(c.getContactId().equals(contactId)) interrupt();
|
||||
} else if(e instanceof MessageAddedEvent) {
|
||||
dbExecutor.execute(new GenerateOffer());
|
||||
} else if(e instanceof MessageExpiredEvent) {
|
||||
@@ -173,10 +171,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
|
||||
dbExecutor.execute(new GenerateTransportAcks());
|
||||
} else if(e instanceof TransportRemovedEvent) {
|
||||
TransportRemovedEvent t = (TransportRemovedEvent) e;
|
||||
if(t.getTransportId().equals(transportId)) {
|
||||
LOG.info("Transport removed, closing");
|
||||
interrupt();
|
||||
}
|
||||
if(t.getTransportId().equals(transportId)) interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,8 +10,14 @@ import java.util.logging.Logger;
|
||||
|
||||
import org.briarproject.api.ContactId;
|
||||
import org.briarproject.api.FormatException;
|
||||
import org.briarproject.api.TransportId;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.db.DbException;
|
||||
import org.briarproject.api.event.ContactRemovedEvent;
|
||||
import org.briarproject.api.event.Event;
|
||||
import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.event.EventListener;
|
||||
import org.briarproject.api.event.TransportRemovedEvent;
|
||||
import org.briarproject.api.messaging.Ack;
|
||||
import org.briarproject.api.messaging.Message;
|
||||
import org.briarproject.api.messaging.MessageVerifier;
|
||||
@@ -30,66 +36,75 @@ import org.briarproject.api.messaging.UnverifiedMessage;
|
||||
* An incoming {@link org.briarproject.api.messaging.MessagingSession
|
||||
* MessagingSession}.
|
||||
*/
|
||||
class IncomingSession implements MessagingSession {
|
||||
class IncomingSession implements MessagingSession, EventListener {
|
||||
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(IncomingSession.class.getName());
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor, cryptoExecutor;
|
||||
private final EventBus eventBus;
|
||||
private final MessageVerifier messageVerifier;
|
||||
private final PacketReaderFactory packetReaderFactory;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final InputStream in;
|
||||
private final PacketReader packetReader;
|
||||
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
IncomingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
Executor cryptoExecutor, MessageVerifier messageVerifier,
|
||||
Executor cryptoExecutor, EventBus eventBus,
|
||||
MessageVerifier messageVerifier,
|
||||
PacketReaderFactory packetReaderFactory, ContactId contactId,
|
||||
InputStream in) {
|
||||
TransportId transportId, InputStream in) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.cryptoExecutor = cryptoExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.messageVerifier = messageVerifier;
|
||||
this.packetReaderFactory = packetReaderFactory;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.in = in;
|
||||
packetReader = packetReaderFactory.createPacketReader(in);
|
||||
}
|
||||
|
||||
public void run() throws IOException {
|
||||
PacketReader packetReader = packetReaderFactory.createPacketReader(in);
|
||||
// Read packets until interrupted or EOF
|
||||
while(!interrupted && !packetReader.eof()) {
|
||||
if(packetReader.hasAck()) {
|
||||
Ack a = packetReader.readAck();
|
||||
dbExecutor.execute(new ReceiveAck(a));
|
||||
} else if(packetReader.hasMessage()) {
|
||||
UnverifiedMessage m = packetReader.readMessage();
|
||||
cryptoExecutor.execute(new VerifyMessage(m));
|
||||
} else if(packetReader.hasRetentionAck()) {
|
||||
RetentionAck a = packetReader.readRetentionAck();
|
||||
dbExecutor.execute(new ReceiveRetentionAck(a));
|
||||
} else if(packetReader.hasRetentionUpdate()) {
|
||||
RetentionUpdate u = packetReader.readRetentionUpdate();
|
||||
dbExecutor.execute(new ReceiveRetentionUpdate(u));
|
||||
} else if(packetReader.hasSubscriptionAck()) {
|
||||
SubscriptionAck a = packetReader.readSubscriptionAck();
|
||||
dbExecutor.execute(new ReceiveSubscriptionAck(a));
|
||||
} else if(packetReader.hasSubscriptionUpdate()) {
|
||||
SubscriptionUpdate u = packetReader.readSubscriptionUpdate();
|
||||
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
|
||||
} else if(packetReader.hasTransportAck()) {
|
||||
TransportAck a = packetReader.readTransportAck();
|
||||
dbExecutor.execute(new ReceiveTransportAck(a));
|
||||
} else if(packetReader.hasTransportUpdate()) {
|
||||
TransportUpdate u = packetReader.readTransportUpdate();
|
||||
dbExecutor.execute(new ReceiveTransportUpdate(u));
|
||||
} else {
|
||||
throw new FormatException();
|
||||
eventBus.addListener(this);
|
||||
try {
|
||||
// Read packets until interrupted or EOF
|
||||
while(!interrupted && !packetReader.eof()) {
|
||||
if(packetReader.hasAck()) {
|
||||
Ack a = packetReader.readAck();
|
||||
dbExecutor.execute(new ReceiveAck(a));
|
||||
} else if(packetReader.hasMessage()) {
|
||||
UnverifiedMessage m = packetReader.readMessage();
|
||||
cryptoExecutor.execute(new VerifyMessage(m));
|
||||
} else if(packetReader.hasRetentionAck()) {
|
||||
RetentionAck a = packetReader.readRetentionAck();
|
||||
dbExecutor.execute(new ReceiveRetentionAck(a));
|
||||
} else if(packetReader.hasRetentionUpdate()) {
|
||||
RetentionUpdate u = packetReader.readRetentionUpdate();
|
||||
dbExecutor.execute(new ReceiveRetentionUpdate(u));
|
||||
} else if(packetReader.hasSubscriptionAck()) {
|
||||
SubscriptionAck a = packetReader.readSubscriptionAck();
|
||||
dbExecutor.execute(new ReceiveSubscriptionAck(a));
|
||||
} else if(packetReader.hasSubscriptionUpdate()) {
|
||||
SubscriptionUpdate u = packetReader.readSubscriptionUpdate();
|
||||
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
|
||||
} else if(packetReader.hasTransportAck()) {
|
||||
TransportAck a = packetReader.readTransportAck();
|
||||
dbExecutor.execute(new ReceiveTransportAck(a));
|
||||
} else if(packetReader.hasTransportUpdate()) {
|
||||
TransportUpdate u = packetReader.readTransportUpdate();
|
||||
dbExecutor.execute(new ReceiveTransportUpdate(u));
|
||||
} else {
|
||||
throw new FormatException();
|
||||
}
|
||||
}
|
||||
in.close();
|
||||
} finally {
|
||||
eventBus.removeListener(this);
|
||||
}
|
||||
in.close();
|
||||
}
|
||||
|
||||
public void interrupt() {
|
||||
@@ -98,6 +113,16 @@ class IncomingSession implements MessagingSession {
|
||||
interrupted = true;
|
||||
}
|
||||
|
||||
public void eventOccurred(Event e) {
|
||||
if(e instanceof ContactRemovedEvent) {
|
||||
ContactRemovedEvent c = (ContactRemovedEvent) e;
|
||||
if(c.getContactId().equals(contactId)) interrupt();
|
||||
} else if(e instanceof TransportRemovedEvent) {
|
||||
TransportRemovedEvent t = (TransportRemovedEvent) e;
|
||||
if(t.getTransportId().equals(transportId)) interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private class ReceiveAck implements Runnable {
|
||||
|
||||
private final Ack ack;
|
||||
|
||||
@@ -43,16 +43,17 @@ class MessagingSessionFactoryImpl implements MessagingSessionFactory {
|
||||
this.packetWriterFactory = packetWriterFactory;
|
||||
}
|
||||
|
||||
public MessagingSession createIncomingSession(ContactId c, InputStream in) {
|
||||
return new IncomingSession(db, dbExecutor, cryptoExecutor,
|
||||
messageVerifier, packetReaderFactory, c, in);
|
||||
public MessagingSession createIncomingSession(ContactId c, TransportId t,
|
||||
InputStream in) {
|
||||
return new IncomingSession(db, dbExecutor, cryptoExecutor, eventBus,
|
||||
messageVerifier, packetReaderFactory, c, t, in);
|
||||
}
|
||||
|
||||
public MessagingSession createOutgoingSession(ContactId c, TransportId t,
|
||||
long maxLatency, OutputStream out, boolean duplex) {
|
||||
if(duplex) return new ReactiveOutgoingSession(db, dbExecutor, eventBus,
|
||||
long maxLatency, boolean duplex, OutputStream out) {
|
||||
if(duplex) return new DuplexOutgoingSession(db, dbExecutor, eventBus,
|
||||
packetWriterFactory, c, t, maxLatency, out);
|
||||
else return new SimplexOutgoingSession(db, dbExecutor, eventBus,
|
||||
packetWriterFactory, c, t, maxLatency, out);
|
||||
else return new SinglePassOutgoingSession(db, dbExecutor,
|
||||
packetWriterFactory, c, maxLatency, out);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,8 +14,14 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import org.briarproject.api.ContactId;
|
||||
import org.briarproject.api.TransportId;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.db.DbException;
|
||||
import org.briarproject.api.event.ContactRemovedEvent;
|
||||
import org.briarproject.api.event.Event;
|
||||
import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.event.EventListener;
|
||||
import org.briarproject.api.event.TransportRemovedEvent;
|
||||
import org.briarproject.api.messaging.Ack;
|
||||
import org.briarproject.api.messaging.MessagingSession;
|
||||
import org.briarproject.api.messaging.PacketWriter;
|
||||
@@ -29,13 +35,14 @@ import org.briarproject.api.messaging.TransportUpdate;
|
||||
|
||||
/**
|
||||
* An outgoing {@link org.briarproject.api.messaging.MessagingSession
|
||||
* MessagingSession} that closes its output stream when no more packets are
|
||||
* available to send.
|
||||
* MessagingSession} suitable for simplex transports. The session sends
|
||||
* messages without offering them, and closes its output stream when there are
|
||||
* no more packets to send.
|
||||
*/
|
||||
class SinglePassOutgoingSession implements MessagingSession {
|
||||
class SimplexOutgoingSession implements MessagingSession, EventListener {
|
||||
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(SinglePassOutgoingSession.class.getName());
|
||||
Logger.getLogger(SimplexOutgoingSession.class.getName());
|
||||
|
||||
private static final ThrowingRunnable<IOException> CLOSE =
|
||||
new ThrowingRunnable<IOException>() {
|
||||
@@ -44,52 +51,60 @@ class SinglePassOutgoingSession implements MessagingSession {
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor;
|
||||
private final PacketWriterFactory packetWriterFactory;
|
||||
private final EventBus eventBus;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final long maxLatency;
|
||||
private final OutputStream out;
|
||||
private final PacketWriter packetWriter;
|
||||
private final AtomicInteger outstandingQueries;
|
||||
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
|
||||
|
||||
private volatile PacketWriter packetWriter = null;
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
SinglePassOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
PacketWriterFactory packetWriterFactory, ContactId contactId,
|
||||
long maxLatency, OutputStream out) {
|
||||
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, PacketWriterFactory packetWriterFactory,
|
||||
ContactId contactId, TransportId transportId, long maxLatency,
|
||||
OutputStream out) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.packetWriterFactory = packetWriterFactory;
|
||||
this.eventBus = eventBus;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.out = out;
|
||||
packetWriter = packetWriterFactory.createPacketWriter(out);
|
||||
outstandingQueries = new AtomicInteger(8); // One per type of packet
|
||||
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
|
||||
}
|
||||
|
||||
public void run() throws IOException {
|
||||
packetWriter = packetWriterFactory.createPacketWriter(out);
|
||||
// Start a query for each type of packet, in order of urgency
|
||||
dbExecutor.execute(new GenerateTransportAcks());
|
||||
dbExecutor.execute(new GenerateTransportUpdates());
|
||||
dbExecutor.execute(new GenerateSubscriptionAck());
|
||||
dbExecutor.execute(new GenerateSubscriptionUpdate());
|
||||
dbExecutor.execute(new GenerateRetentionAck());
|
||||
dbExecutor.execute(new GenerateRetentionUpdate());
|
||||
dbExecutor.execute(new GenerateAck());
|
||||
dbExecutor.execute(new GenerateBatch());
|
||||
// Write packets until interrupted or there are no more packets to write
|
||||
eventBus.addListener(this);
|
||||
try {
|
||||
while(!interrupted) {
|
||||
ThrowingRunnable<IOException> task = writerTasks.take();
|
||||
if(task == CLOSE) break;
|
||||
task.run();
|
||||
// Start a query for each type of packet, in order of urgency
|
||||
dbExecutor.execute(new GenerateTransportAcks());
|
||||
dbExecutor.execute(new GenerateTransportUpdates());
|
||||
dbExecutor.execute(new GenerateSubscriptionAck());
|
||||
dbExecutor.execute(new GenerateSubscriptionUpdate());
|
||||
dbExecutor.execute(new GenerateRetentionAck());
|
||||
dbExecutor.execute(new GenerateRetentionUpdate());
|
||||
dbExecutor.execute(new GenerateAck());
|
||||
dbExecutor.execute(new GenerateBatch());
|
||||
// Write packets until interrupted or no more packets to write
|
||||
try {
|
||||
while(!interrupted) {
|
||||
ThrowingRunnable<IOException> task = writerTasks.take();
|
||||
if(task == CLOSE) break;
|
||||
task.run();
|
||||
}
|
||||
out.flush();
|
||||
out.close();
|
||||
} catch(InterruptedException e) {
|
||||
LOG.info("Interrupted while waiting for a packet to write");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
out.flush();
|
||||
out.close();
|
||||
} catch(InterruptedException e) {
|
||||
LOG.info("Interrupted while waiting for a packet to write");
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
eventBus.removeListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,6 +117,16 @@ class SinglePassOutgoingSession implements MessagingSession {
|
||||
if(outstandingQueries.decrementAndGet() == 0) writerTasks.add(CLOSE);
|
||||
}
|
||||
|
||||
public void eventOccurred(Event e) {
|
||||
if(e instanceof ContactRemovedEvent) {
|
||||
ContactRemovedEvent c = (ContactRemovedEvent) e;
|
||||
if(c.getContactId().equals(contactId)) interrupt();
|
||||
} else if(e instanceof TransportRemovedEvent) {
|
||||
TransportRemovedEvent t = (TransportRemovedEvent) e;
|
||||
if(t.getTransportId().equals(transportId)) interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
// This task runs on the database thread
|
||||
private class GenerateAck implements Runnable {
|
||||
|
||||
@@ -100,7 +100,7 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
StreamReader streamReader = streamReaderFactory.createStreamReader(in,
|
||||
r.getMaxFrameLength(), ctx);
|
||||
return messagingSessionFactory.createIncomingSession(ctx.getContactId(),
|
||||
streamReader.getInputStream());
|
||||
ctx.getTransportId(), streamReader.getInputStream());
|
||||
}
|
||||
|
||||
private MessagingSession createOutgoingSession(StreamContext ctx,
|
||||
@@ -110,7 +110,7 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
w.getMaxFrameLength(), ctx);
|
||||
return messagingSessionFactory.createOutgoingSession(ctx.getContactId(),
|
||||
ctx.getTransportId(), w.getMaxLatency(),
|
||||
streamWriter.getOutputStream(), duplex);
|
||||
duplex, streamWriter.getOutputStream());
|
||||
}
|
||||
|
||||
private class DispatchIncomingSimplexConnection implements Runnable {
|
||||
|
||||
Reference in New Issue
Block a user