PacketWriters aren't responsible for flushing their output streams.

This commit is contained in:
akwizgran
2014-11-04 17:03:06 +00:00
parent 1151c35d1c
commit 33c3eb7308
8 changed files with 15 additions and 39 deletions

View File

@@ -29,8 +29,4 @@ public interface PacketWriter {
void writeTransportAck(TransportAck a) throws IOException; void writeTransportAck(TransportAck a) throws IOException;
void writeTransportUpdate(TransportUpdate u) throws IOException; void writeTransportUpdate(TransportUpdate u) throws IOException;
void flush() throws IOException;
void close() throws IOException;
} }

View File

@@ -4,5 +4,5 @@ import java.io.OutputStream;
public interface PacketWriterFactory { public interface PacketWriterFactory {
PacketWriter createPacketWriter(OutputStream out, boolean flush); PacketWriter createPacketWriter(OutputStream out);
} }

View File

@@ -21,7 +21,7 @@ class PacketWriterFactoryImpl implements PacketWriterFactory {
this.writerFactory = writerFactory; this.writerFactory = writerFactory;
} }
public PacketWriter createPacketWriter(OutputStream out, boolean flush) { public PacketWriter createPacketWriter(OutputStream out) {
return new PacketWriterImpl(serial, writerFactory, out, flush); return new PacketWriterImpl(serial, writerFactory, out);
} }
} }

View File

@@ -36,14 +36,12 @@ class PacketWriterImpl implements PacketWriter {
private final SerialComponent serial; private final SerialComponent serial;
private final OutputStream out; private final OutputStream out;
private final boolean flush;
private final Writer w; private final Writer w;
PacketWriterImpl(SerialComponent serial, WriterFactory writerFactory, PacketWriterImpl(SerialComponent serial, WriterFactory writerFactory,
OutputStream out, boolean flush) { OutputStream out) {
this.serial = serial; this.serial = serial;
this.out = out; this.out = out;
this.flush = flush;
w = writerFactory.createWriter(out); w = writerFactory.createWriter(out);
} }
@@ -75,12 +73,10 @@ class PacketWriterImpl implements PacketWriter {
for(MessageId m : a.getMessageIds()) w.writeBytes(m.getBytes()); for(MessageId m : a.getMessageIds()) w.writeBytes(m.getBytes());
w.writeListEnd(); w.writeListEnd();
w.writeStructEnd(); w.writeStructEnd();
if(flush) out.flush();
} }
public void writeMessage(byte[] raw) throws IOException { public void writeMessage(byte[] raw) throws IOException {
out.write(raw); out.write(raw);
if(flush) out.flush();
} }
public void writeOffer(Offer o) throws IOException { public void writeOffer(Offer o) throws IOException {
@@ -89,7 +85,6 @@ class PacketWriterImpl implements PacketWriter {
for(MessageId m : o.getMessageIds()) w.writeBytes(m.getBytes()); for(MessageId m : o.getMessageIds()) w.writeBytes(m.getBytes());
w.writeListEnd(); w.writeListEnd();
w.writeStructEnd(); w.writeStructEnd();
if(flush) out.flush();
} }
public void writeRequest(Request r) throws IOException { public void writeRequest(Request r) throws IOException {
@@ -98,14 +93,12 @@ class PacketWriterImpl implements PacketWriter {
for(MessageId m : r.getMessageIds()) w.writeBytes(m.getBytes()); for(MessageId m : r.getMessageIds()) w.writeBytes(m.getBytes());
w.writeListEnd(); w.writeListEnd();
w.writeStructEnd(); w.writeStructEnd();
if(flush) out.flush();
} }
public void writeRetentionAck(RetentionAck a) throws IOException { public void writeRetentionAck(RetentionAck a) throws IOException {
w.writeStructStart(RETENTION_ACK); w.writeStructStart(RETENTION_ACK);
w.writeInteger(a.getVersion()); w.writeInteger(a.getVersion());
w.writeStructEnd(); w.writeStructEnd();
if(flush) out.flush();
} }
public void writeRetentionUpdate(RetentionUpdate u) throws IOException { public void writeRetentionUpdate(RetentionUpdate u) throws IOException {
@@ -113,14 +106,12 @@ class PacketWriterImpl implements PacketWriter {
w.writeInteger(u.getRetentionTime()); w.writeInteger(u.getRetentionTime());
w.writeInteger(u.getVersion()); w.writeInteger(u.getVersion());
w.writeStructEnd(); w.writeStructEnd();
if(flush) out.flush();
} }
public void writeSubscriptionAck(SubscriptionAck a) throws IOException { public void writeSubscriptionAck(SubscriptionAck a) throws IOException {
w.writeStructStart(SUBSCRIPTION_ACK); w.writeStructStart(SUBSCRIPTION_ACK);
w.writeInteger(a.getVersion()); w.writeInteger(a.getVersion());
w.writeStructEnd(); w.writeStructEnd();
if(flush) out.flush();
} }
public void writeSubscriptionUpdate(SubscriptionUpdate u) public void writeSubscriptionUpdate(SubscriptionUpdate u)
@@ -136,7 +127,6 @@ class PacketWriterImpl implements PacketWriter {
w.writeListEnd(); w.writeListEnd();
w.writeInteger(u.getVersion()); w.writeInteger(u.getVersion());
w.writeStructEnd(); w.writeStructEnd();
if(flush) out.flush();
} }
public void writeTransportAck(TransportAck a) throws IOException { public void writeTransportAck(TransportAck a) throws IOException {
@@ -144,7 +134,6 @@ class PacketWriterImpl implements PacketWriter {
w.writeString(a.getId().getString()); w.writeString(a.getId().getString());
w.writeInteger(a.getVersion()); w.writeInteger(a.getVersion());
w.writeStructEnd(); w.writeStructEnd();
if(flush) out.flush();
} }
public void writeTransportUpdate(TransportUpdate u) throws IOException { public void writeTransportUpdate(TransportUpdate u) throws IOException {
@@ -153,14 +142,5 @@ class PacketWriterImpl implements PacketWriter {
w.writeMap(u.getProperties()); w.writeMap(u.getProperties());
w.writeInteger(u.getVersion()); w.writeInteger(u.getVersion());
w.writeStructEnd(); w.writeStructEnd();
if(flush) out.flush();
}
public void flush() throws IOException {
out.flush();
}
public void close() throws IOException {
out.close();
} }
} }

View File

@@ -100,7 +100,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter( StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
out, maxFrameLength, ctx); out, maxFrameLength, ctx);
out = streamWriter.getOutputStream(); out = streamWriter.getOutputStream();
packetWriter = packetWriterFactory.createPacketWriter(out, true); packetWriter = packetWriterFactory.createPacketWriter(out);
// Start a query for each type of packet, in order of urgency // Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateTransportAcks()); dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates()); dbExecutor.execute(new GenerateTransportUpdates());
@@ -118,8 +118,8 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
ThrowingRunnable<IOException> task = writerTasks.take(); ThrowingRunnable<IOException> task = writerTasks.take();
if(task == CLOSE) break; if(task == CLOSE) break;
task.run(); task.run();
if(writerTasks.isEmpty()) out.flush();
} }
out.flush();
out.close(); out.close();
} catch(InterruptedException e) { } catch(InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write"); LOG.info("Interrupted while waiting for a packet to write");

View File

@@ -83,7 +83,7 @@ class SinglePassOutgoingSession implements MessagingSession {
streamWriter = streamWriterFactory.createStreamWriter(out, streamWriter = streamWriterFactory.createStreamWriter(out,
maxFrameLength, ctx); maxFrameLength, ctx);
out = streamWriter.getOutputStream(); out = streamWriter.getOutputStream();
packetWriter = packetWriterFactory.createPacketWriter(out, false); packetWriter = packetWriterFactory.createPacketWriter(out);
// Start a query for each type of packet, in order of urgency // Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateTransportAcks()); dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates()); dbExecutor.execute(new GenerateTransportUpdates());

View File

@@ -125,8 +125,8 @@ public class ProtocolIntegrationTest extends BriarTestCase {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out, StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out,
MAX_FRAME_LENGTH, ctx); MAX_FRAME_LENGTH, ctx);
OutputStream out1 = streamWriter.getOutputStream(); OutputStream out1 = streamWriter.getOutputStream();
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out1, PacketWriter packetWriter =
false); packetWriterFactory.createPacketWriter(out1);
packetWriter.writeAck(new Ack(messageIds)); packetWriter.writeAck(new Ack(messageIds));
@@ -144,7 +144,7 @@ public class ProtocolIntegrationTest extends BriarTestCase {
transportProperties, 1); transportProperties, 1);
packetWriter.writeTransportUpdate(tu); packetWriter.writeTransportUpdate(tu);
packetWriter.flush(); out1.flush();
return out.toByteArray(); return out.toByteArray();
} }

View File

@@ -179,7 +179,7 @@ public class ConstantsTest extends BriarTestCase {
TransportUpdate u = new TransportUpdate(id, p, Long.MAX_VALUE); TransportUpdate u = new TransportUpdate(id, p, Long.MAX_VALUE);
// Serialise the update // Serialise the update
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
PacketWriter writer = packetWriterFactory.createPacketWriter(out, true); PacketWriter writer = packetWriterFactory.createPacketWriter(out);
writer.writeTransportUpdate(u); writer.writeTransportUpdate(u);
// Check the size of the serialised transport update // Check the size of the serialised transport update
assertTrue(out.size() <= MAX_PACKET_LENGTH); assertTrue(out.size() <= MAX_PACKET_LENGTH);
@@ -197,7 +197,7 @@ public class ConstantsTest extends BriarTestCase {
SubscriptionUpdate u = new SubscriptionUpdate(groups, Long.MAX_VALUE); SubscriptionUpdate u = new SubscriptionUpdate(groups, Long.MAX_VALUE);
// Serialise the update // Serialise the update
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
PacketWriter writer = packetWriterFactory.createPacketWriter(out, true); PacketWriter writer = packetWriterFactory.createPacketWriter(out);
writer.writeSubscriptionUpdate(u); writer.writeSubscriptionUpdate(u);
// Check the size of the serialised subscription update // Check the size of the serialised subscription update
assertTrue(out.size() <= MAX_PACKET_LENGTH); assertTrue(out.size() <= MAX_PACKET_LENGTH);
@@ -206,7 +206,7 @@ public class ConstantsTest extends BriarTestCase {
private void testMessageIdsFitIntoAck(int length) throws Exception { private void testMessageIdsFitIntoAck(int length) throws Exception {
// Create an ack with as many message IDs as possible // Create an ack with as many message IDs as possible
ByteArrayOutputStream out = new ByteArrayOutputStream(length); ByteArrayOutputStream out = new ByteArrayOutputStream(length);
PacketWriter writer = packetWriterFactory.createPacketWriter(out, true); PacketWriter writer = packetWriterFactory.createPacketWriter(out);
int maxMessages = writer.getMaxMessagesForAck(length); int maxMessages = writer.getMaxMessagesForAck(length);
Collection<MessageId> ids = new ArrayList<MessageId>(); Collection<MessageId> ids = new ArrayList<MessageId>();
for(int i = 0; i < maxMessages; i++) for(int i = 0; i < maxMessages; i++)
@@ -219,7 +219,7 @@ public class ConstantsTest extends BriarTestCase {
private void testMessageIdsFitIntoRequest(int length) throws Exception { private void testMessageIdsFitIntoRequest(int length) throws Exception {
// Create a request with as many message IDs as possible // Create a request with as many message IDs as possible
ByteArrayOutputStream out = new ByteArrayOutputStream(length); ByteArrayOutputStream out = new ByteArrayOutputStream(length);
PacketWriter writer = packetWriterFactory.createPacketWriter(out, true); PacketWriter writer = packetWriterFactory.createPacketWriter(out);
int maxMessages = writer.getMaxMessagesForRequest(length); int maxMessages = writer.getMaxMessagesForRequest(length);
Collection<MessageId> ids = new ArrayList<MessageId>(); Collection<MessageId> ids = new ArrayList<MessageId>();
for(int i = 0; i < maxMessages; i++) for(int i = 0; i < maxMessages; i++)
@@ -232,7 +232,7 @@ public class ConstantsTest extends BriarTestCase {
private void testMessageIdsFitIntoOffer(int length) throws Exception { private void testMessageIdsFitIntoOffer(int length) throws Exception {
// Create an offer with as many message IDs as possible // Create an offer with as many message IDs as possible
ByteArrayOutputStream out = new ByteArrayOutputStream(length); ByteArrayOutputStream out = new ByteArrayOutputStream(length);
PacketWriter writer = packetWriterFactory.createPacketWriter(out, true); PacketWriter writer = packetWriterFactory.createPacketWriter(out);
int maxMessages = writer.getMaxMessagesForOffer(length); int maxMessages = writer.getMaxMessagesForOffer(length);
Collection<MessageId> ids = new ArrayList<MessageId>(); Collection<MessageId> ids = new ArrayList<MessageId>();
for(int i = 0; i < maxMessages; i++) for(int i = 0; i < maxMessages; i++)