Messaging sessions aren't responsible for closing their streams.

The TransportReader/Writer's dispose() method should handle that, and
ConnectionManager is responsible for calling it.
This commit is contained in:
akwizgran
2014-11-06 13:13:23 +00:00
parent b27a17db88
commit 1d20761123
4 changed files with 30 additions and 21 deletions

View File

@@ -109,10 +109,10 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
ThrowingRunnable<IOException> task = writerTasks.take(); ThrowingRunnable<IOException> task = writerTasks.take();
if(task == CLOSE) break; if(task == CLOSE) break;
task.run(); task.run();
// Flush the stream if it's going to be idle
if(writerTasks.isEmpty()) out.flush(); if(writerTasks.isEmpty()) out.flush();
} }
out.flush(); out.flush();
out.close();
} catch(InterruptedException e) { } catch(InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write"); LOG.info("Interrupted while waiting for a packet to write");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -206,6 +206,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeAck(ack); packetWriter.writeAck(ack);
LOG.info("Sent ack"); LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck()); dbExecutor.execute(new GenerateAck());
@@ -240,6 +241,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
for(byte[] raw : batch) packetWriter.writeMessage(raw); for(byte[] raw : batch) packetWriter.writeMessage(raw);
LOG.info("Sent batch"); LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateBatch());
@@ -275,6 +277,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeOffer(offer); packetWriter.writeOffer(offer);
LOG.info("Sent offer"); LOG.info("Sent offer");
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
@@ -310,6 +313,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeRequest(request); packetWriter.writeRequest(request);
LOG.info("Sent request"); LOG.info("Sent request");
dbExecutor.execute(new GenerateRequest()); dbExecutor.execute(new GenerateRequest());
@@ -344,6 +348,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeRetentionAck(ack); packetWriter.writeRetentionAck(ack);
LOG.info("Sent retention ack"); LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck()); dbExecutor.execute(new GenerateRetentionAck());
@@ -379,6 +384,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeRetentionUpdate(update); packetWriter.writeRetentionUpdate(update);
LOG.info("Sent retention update"); LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate()); dbExecutor.execute(new GenerateRetentionUpdate());
@@ -413,6 +419,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeSubscriptionAck(ack); packetWriter.writeSubscriptionAck(ack);
LOG.info("Sent subscription ack"); LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck()); dbExecutor.execute(new GenerateSubscriptionAck());
@@ -448,6 +455,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeSubscriptionUpdate(update); packetWriter.writeSubscriptionUpdate(update);
LOG.info("Sent subscription update"); LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate()); dbExecutor.execute(new GenerateSubscriptionUpdate());
@@ -482,6 +490,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
for(TransportAck a : acks) packetWriter.writeTransportAck(a); for(TransportAck a : acks) packetWriter.writeTransportAck(a);
LOG.info("Sent transport acks"); LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks()); dbExecutor.execute(new GenerateTransportAcks());
@@ -517,6 +526,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
for(TransportUpdate u : updates) for(TransportUpdate u : updates)
packetWriter.writeTransportUpdate(u); packetWriter.writeTransportUpdate(u);
LOG.info("Sent transport updates"); LOG.info("Sent transport updates");

View File

@@ -48,7 +48,6 @@ class IncomingSession implements MessagingSession, EventListener {
private final MessageVerifier messageVerifier; private final MessageVerifier messageVerifier;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
private final InputStream in;
private final PacketReader packetReader; private final PacketReader packetReader;
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
@@ -65,7 +64,6 @@ class IncomingSession implements MessagingSession, EventListener {
this.messageVerifier = messageVerifier; this.messageVerifier = messageVerifier;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId; this.transportId = transportId;
this.in = in;
packetReader = packetReaderFactory.createPacketReader(in); packetReader = packetReaderFactory.createPacketReader(in);
} }
@@ -102,7 +100,6 @@ class IncomingSession implements MessagingSession, EventListener {
throw new FormatException(); throw new FormatException();
} }
} }
in.close();
} finally { } finally {
eventBus.removeListener(this); eventBus.removeListener(this);
} }

View File

@@ -99,7 +99,6 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
task.run(); task.run();
} }
out.flush(); out.flush();
out.close();
} catch(InterruptedException e) { } catch(InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write"); LOG.info("Interrupted while waiting for a packet to write");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -159,6 +158,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeAck(ack); packetWriter.writeAck(ack);
LOG.info("Sent ack"); LOG.info("Sent ack");
dbExecutor.execute(new GenerateAck()); dbExecutor.execute(new GenerateAck());
@@ -194,6 +194,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
for(byte[] raw : batch) packetWriter.writeMessage(raw); for(byte[] raw : batch) packetWriter.writeMessage(raw);
LOG.info("Sent batch"); LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateBatch());
@@ -229,6 +230,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeRetentionAck(ack); packetWriter.writeRetentionAck(ack);
LOG.info("Sent retention ack"); LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck()); dbExecutor.execute(new GenerateRetentionAck());
@@ -265,6 +267,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeRetentionUpdate(update); packetWriter.writeRetentionUpdate(update);
LOG.info("Sent retention update"); LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate()); dbExecutor.execute(new GenerateRetentionUpdate());
@@ -300,6 +303,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeSubscriptionAck(ack); packetWriter.writeSubscriptionAck(ack);
LOG.info("Sent subscription ack"); LOG.info("Sent subscription ack");
dbExecutor.execute(new GenerateSubscriptionAck()); dbExecutor.execute(new GenerateSubscriptionAck());
@@ -336,6 +340,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
packetWriter.writeSubscriptionUpdate(update); packetWriter.writeSubscriptionUpdate(update);
LOG.info("Sent subscription update"); LOG.info("Sent subscription update");
dbExecutor.execute(new GenerateSubscriptionUpdate()); dbExecutor.execute(new GenerateSubscriptionUpdate());
@@ -371,6 +376,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
for(TransportAck a : acks) packetWriter.writeTransportAck(a); for(TransportAck a : acks) packetWriter.writeTransportAck(a);
LOG.info("Sent transport acks"); LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks()); dbExecutor.execute(new GenerateTransportAcks());
@@ -407,6 +413,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
} }
public void run() throws IOException { public void run() throws IOException {
if(interrupted) return;
for(TransportUpdate u : updates) for(TransportUpdate u : updates)
packetWriter.writeTransportUpdate(u); packetWriter.writeTransportUpdate(u);
LOG.info("Sent transport updates"); LOG.info("Sent transport updates");

View File

@@ -93,13 +93,6 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
new TransportModule()); new TransportModule());
} }
@Test
public void testInjection() {
DatabaseComponent aliceDb = alice.getInstance(DatabaseComponent.class);
DatabaseComponent bobDb = bob.getInstance(DatabaseComponent.class);
assertFalse(aliceDb == bobDb);
}
@Test @Test
public void testWriteAndRead() throws Exception { public void testWriteAndRead() throws Exception {
read(write()); read(write());
@@ -110,8 +103,8 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
DatabaseComponent db = alice.getInstance(DatabaseComponent.class); DatabaseComponent db = alice.getInstance(DatabaseComponent.class);
assertFalse(db.open()); assertFalse(db.open());
// Start Alice's key manager // Start Alice's key manager
KeyManager km = alice.getInstance(KeyManager.class); KeyManager keyManager = alice.getInstance(KeyManager.class);
km.start(); keyManager.start();
// Add a local pseudonym for Alice // Add a local pseudonym for Alice
AuthorId aliceId = new AuthorId(TestUtils.getRandomId()); AuthorId aliceId = new AuthorId(TestUtils.getRandomId());
LocalAuthor aliceAuthor = new LocalAuthor(aliceId, "Alice", LocalAuthor aliceAuthor = new LocalAuthor(aliceId, "Alice",
@@ -131,7 +124,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
db.addTransport(transportId, LATENCY); db.addTransport(transportId, LATENCY);
Endpoint ep = new Endpoint(contactId, transportId, epoch, true); Endpoint ep = new Endpoint(contactId, transportId, epoch, true);
db.addEndpoint(ep); db.addEndpoint(ep);
km.endpointAdded(ep, LATENCY, initialSecret.clone()); keyManager.endpointAdded(ep, LATENCY, initialSecret.clone());
// Send Bob a message // Send Bob a message
String contentType = "text/plain"; String contentType = "text/plain";
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
@@ -144,7 +137,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
StreamWriterFactory streamWriterFactory = StreamWriterFactory streamWriterFactory =
alice.getInstance(StreamWriterFactory.class); alice.getInstance(StreamWriterFactory.class);
StreamContext ctx = km.getStreamContext(contactId, transportId); StreamContext ctx = keyManager.getStreamContext(contactId, transportId);
assertNotNull(ctx); assertNotNull(ctx);
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out, StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out,
MAX_FRAME_LENGTH, ctx); MAX_FRAME_LENGTH, ctx);
@@ -158,8 +151,9 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
streamWriter.getOutputStream()); streamWriter.getOutputStream());
// Write whatever needs to be written // Write whatever needs to be written
session.run(); session.run();
streamWriter.getOutputStream().close();
// Clean up // Clean up
km.stop(); keyManager.stop();
db.close(); db.close();
// Return the contents of the stream // Return the contents of the stream
return out.toByteArray(); return out.toByteArray();
@@ -170,8 +164,8 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
DatabaseComponent db = bob.getInstance(DatabaseComponent.class); DatabaseComponent db = bob.getInstance(DatabaseComponent.class);
assertFalse(db.open()); assertFalse(db.open());
// Start Bob's key manager // Start Bob's key manager
KeyManager km = bob.getInstance(KeyManager.class); KeyManager keyManager = bob.getInstance(KeyManager.class);
km.start(); keyManager.start();
// Add a local pseudonym for Bob // Add a local pseudonym for Bob
AuthorId bobId = new AuthorId(TestUtils.getRandomId()); AuthorId bobId = new AuthorId(TestUtils.getRandomId());
LocalAuthor bobAuthor = new LocalAuthor(bobId, "Bob", LocalAuthor bobAuthor = new LocalAuthor(bobId, "Bob",
@@ -191,7 +185,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
db.addTransport(transportId, LATENCY); db.addTransport(transportId, LATENCY);
Endpoint ep = new Endpoint(contactId, transportId, epoch, false); Endpoint ep = new Endpoint(contactId, transportId, epoch, false);
db.addEndpoint(ep); db.addEndpoint(ep);
km.endpointAdded(ep, LATENCY, initialSecret.clone()); keyManager.endpointAdded(ep, LATENCY, initialSecret.clone());
// Set up an event listener // Set up an event listener
MessageListener listener = new MessageListener(); MessageListener listener = new MessageListener();
bob.getInstance(EventBus.class).addListener(listener); bob.getInstance(EventBus.class).addListener(listener);
@@ -222,10 +216,11 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
assertFalse(listener.messageAdded); assertFalse(listener.messageAdded);
// Read whatever needs to be read // Read whatever needs to be read
session.run(); session.run();
streamReader.getInputStream().close();
// The private message from Alice should have been added // The private message from Alice should have been added
assertTrue(listener.messageAdded); assertTrue(listener.messageAdded);
// Clean up // Clean up
km.stop(); keyManager.stop();
db.close(); db.close();
} }