Removed shouldFlush() from plugins, added missing PacketWriter method.

This commit is contained in:
akwizgran
2014-01-15 17:10:25 +00:00
parent c146da2e7a
commit 6af3c54c28
17 changed files with 54 additions and 68 deletions

View File

@@ -21,8 +21,7 @@ class PacketWriterFactoryImpl implements PacketWriterFactory {
this.writerFactory = writerFactory;
}
public PacketWriter createPacketWriter(OutputStream out,
boolean flush) {
public PacketWriter createPacketWriter(OutputStream out, boolean flush) {
return new PacketWriterImpl(serial, writerFactory, out, flush);
}
}

View File

@@ -47,19 +47,21 @@ class PacketWriterImpl implements PacketWriter {
w = writerFactory.createWriter(out);
}
public int getMaxMessagesForAck(long capacity) {
return getMaxMessagesForPacket(capacity, ACK);
}
public int getMaxMessagesForRequest(long capacity) {
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
int overhead = serial.getSerialisedStructStartLength(ACK)
+ serial.getSerialisedListStartLength()
+ serial.getSerialisedListEndLength()
+ serial.getSerialisedStructEndLength();
int idLength = serial.getSerialisedUniqueIdLength();
return (packet - overhead) / idLength;
return getMaxMessagesForPacket(capacity, REQUEST);
}
public int getMaxMessagesForOffer(long capacity) {
return getMaxMessagesForPacket(capacity, OFFER);
}
private int getMaxMessagesForPacket(long capacity, int structId) {
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
int overhead = serial.getSerialisedStructStartLength(OFFER)
int overhead = serial.getSerialisedStructStartLength(structId)
+ serial.getSerialisedListStartLength()
+ serial.getSerialisedListEndLength()
+ serial.getSerialisedStructEndLength();

View File

@@ -226,8 +226,7 @@ abstract class DuplexConnection implements EventListener {
db.addListener(this);
try {
OutputStream out = createConnectionWriter().getOutputStream();
writer = packetWriterFactory.createPacketWriter(out,
transport.shouldFlush());
writer = packetWriterFactory.createPacketWriter(out, true);
if(LOG.isLoggable(INFO)) LOG.info("Starting to write");
// Send the initial packets
dbExecutor.execute(new GenerateTransportAcks());
@@ -500,7 +499,7 @@ abstract class DuplexConnection implements EventListener {
public void run() {
assert writer != null;
int maxMessages = writer.getMaxMessagesForRequest(Long.MAX_VALUE);
int maxMessages = writer.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
if(LOG.isLoggable(INFO))

View File

@@ -72,7 +72,7 @@ class OutgoingSimplexConnection {
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH)
throw new EOFException();
PacketWriter writer = packetWriterFactory.createPacketWriter(out,
transport.shouldFlush());
false);
// Send the initial packets: updates and acks
boolean hasSpace = writeTransportAcks(conn, writer);
if(hasSpace) hasSpace = writeTransportUpdates(conn, writer);
@@ -82,12 +82,12 @@ class OutgoingSimplexConnection {
if(hasSpace) hasSpace = writeRetentionUpdate(conn, writer);
// Write acks until you can't write acks no more
capacity = conn.getRemainingCapacity();
int maxMessages = writer.getMaxMessagesForRequest(capacity);
int maxMessages = writer.getMaxMessagesForAck(capacity);
Ack a = db.generateAck(contactId, maxMessages);
while(a != null) {
writer.writeAck(a);
capacity = conn.getRemainingCapacity();
maxMessages = writer.getMaxMessagesForRequest(capacity);
maxMessages = writer.getMaxMessagesForAck(capacity);
a = db.generateAck(contactId, maxMessages);
}
// Write messages until you can't write messages no more

View File

@@ -43,10 +43,6 @@ class FileTransportWriter implements SimplexTransportWriter {
return out;
}
public boolean shouldFlush() {
return false;
}
public void dispose(boolean exception) {
try {
out.close();

View File

@@ -34,10 +34,6 @@ class TcpTransportConnection implements DuplexTransportConnection {
return socket.getOutputStream();
}
public boolean shouldFlush() {
return true;
}
public void dispose(boolean exception, boolean recognised)
throws IOException {
socket.close();