Check for space before writing packets to simplex connections.

This commit is contained in:
akwizgran
2013-01-31 13:13:09 +00:00
parent 3e9d6113a2
commit 3356a22c81

View File

@@ -58,8 +58,6 @@ class OutgoingSimplexConnection {
transportId = ctx.getTransportId();
}
// FIXME: Write each packet to a buffer, check for capacity before writing
// it to the connection (except raw messages, which are already serialised)
void write() {
connRegistry.registerConnection(contactId, transportId);
try {
@@ -72,26 +70,12 @@ class OutgoingSimplexConnection {
PacketWriter writer = protoFactory.createPacketWriter(out,
transport.shouldFlush());
// Send the initial packets: updates and acks
Collection<TransportAck> transportAcks =
db.generateTransportAcks(contactId);
if(transportAcks != null) {
for(TransportAck ta : transportAcks)
writer.writeTransportAck(ta);
}
Collection<TransportUpdate> transportUpdates =
db.generateTransportUpdates(contactId);
if(transportUpdates != null) {
for(TransportUpdate tu : transportUpdates)
writer.writeTransportUpdate(tu);
}
SubscriptionAck sa = db.generateSubscriptionAck(contactId);
if(sa != null) writer.writeSubscriptionAck(sa);
SubscriptionUpdate su = db.generateSubscriptionUpdate(contactId);
if(su != null) writer.writeSubscriptionUpdate(su);
RetentionAck ra = db.generateRetentionAck(contactId);
if(ra != null) writer.writeRetentionAck(ra);
RetentionUpdate ru = db.generateRetentionUpdate(contactId);
if(ru != null) writer.writeRetentionUpdate(ru);
boolean hasSpace = writeTransportAcks(conn, writer);
if(hasSpace) hasSpace = writeTransportUpdates(conn, writer);
if(hasSpace) hasSpace = writeSubscriptionAck(conn, writer);
if(hasSpace) hasSpace = writeSubscriptionUpdate(conn, writer);
if(hasSpace) hasSpace = writeRetentionAck(conn, writer);
if(hasSpace) hasSpace = writeRetentionUpdate(conn, writer);
// Write acks until you can't write acks no more
long capacity = conn.getRemainingCapacity();
int maxMessages = writer.getMaxMessagesForAck(capacity);
@@ -126,6 +110,67 @@ class OutgoingSimplexConnection {
}
}
private boolean writeTransportAcks(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
Collection<TransportAck> acks = db.generateTransportAcks(contactId);
if(acks == null) return true;
for(TransportAck a : acks) {
writer.writeTransportAck(a);
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH) return false;
}
return true;
}
private boolean writeTransportUpdates(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
Collection<TransportUpdate> updates =
db.generateTransportUpdates(contactId);
if(updates == null) return true;
for(TransportUpdate u : updates) {
writer.writeTransportUpdate(u);
if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH) return false;
}
return true;
}
private boolean writeSubscriptionAck(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(a == null) return true;
writer.writeSubscriptionAck(a);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private boolean writeSubscriptionUpdate(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId);
if(u == null) return true;
writer.writeSubscriptionUpdate(u);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private boolean writeRetentionAck(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
RetentionAck a = db.generateRetentionAck(contactId);
if(a == null) return true;
writer.writeRetentionAck(a);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private boolean writeRetentionUpdate(ConnectionWriter conn,
PacketWriter writer) throws DbException, IOException {
assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
RetentionUpdate u = db.generateRetentionUpdate(contactId);
if(u == null) return true;
writer.writeRetentionUpdate(u);
return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH;
}
private void dispose(boolean exception) {
ByteUtils.erase(ctx.getSecret());
try {