Don't generate acks faster than the IO thread can write them.

This commit is contained in:
akwizgran
2011-12-08 12:56:00 +00:00
parent ae87100c8f
commit c1ab21ba2f

View File

@@ -154,7 +154,7 @@ abstract class StreamConnection implements DatabaseListener {
// Mark the unrequested messages as seen // Mark the unrequested messages as seen
dbExecutor.execute(new SetSeen(seen)); dbExecutor.execute(new SetSeen(seen));
// Start sending the requested messages // Start sending the requested messages
dbExecutor.execute(new GenerateBatch(requested)); dbExecutor.execute(new GenerateBatches(requested));
} else if(reader.hasSubscriptionUpdate()) { } else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate s = reader.readSubscriptionUpdate(); SubscriptionUpdate s = reader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); dbExecutor.execute(new ReceiveSubscriptionUpdate(s));
@@ -375,10 +375,7 @@ abstract class StreamConnection implements DatabaseListener {
int maxBatches = writer.getMaxBatchesForAck(Long.MAX_VALUE); int maxBatches = writer.getMaxBatchesForAck(Long.MAX_VALUE);
try { try {
Ack a = db.generateAck(contactId, maxBatches); Ack a = db.generateAck(contactId, maxBatches);
while(a != null) { if(a != null) writerTasks.add(new WriteAck(a));
writerTasks.add(new WriteAck(a));
a = db.generateAck(contactId, maxBatches);
}
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
} }
@@ -398,6 +395,7 @@ abstract class StreamConnection implements DatabaseListener {
assert writer != null; assert writer != null;
try { try {
writer.writeAck(ack); writer.writeAck(ack);
dbExecutor.execute(new GenerateAcks());
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
transport.dispose(false); transport.dispose(false);
@@ -406,11 +404,11 @@ abstract class StreamConnection implements DatabaseListener {
} }
// This task runs on a database thred // This task runs on a database thred
private class GenerateBatch implements Runnable { private class GenerateBatches implements Runnable {
private final Collection<MessageId> requested; private final Collection<MessageId> requested;
private GenerateBatch(Collection<MessageId> requested) { private GenerateBatches(Collection<MessageId> requested) {
this.requested = requested; this.requested = requested;
} }
@@ -443,7 +441,7 @@ abstract class StreamConnection implements DatabaseListener {
try { try {
writer.writeBatch(batch); writer.writeBatch(batch);
if(requested.isEmpty()) dbExecutor.execute(new GenerateOffer()); if(requested.isEmpty()) dbExecutor.execute(new GenerateOffer());
else dbExecutor.execute(new GenerateBatch(requested)); else dbExecutor.execute(new GenerateBatches(requested));
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
transport.dispose(false); transport.dispose(false);