Wait for all data to be acked when flushing output stream. Logging.

This commit is contained in:
akwizgran
2012-12-06 18:20:55 +00:00
parent 7759c10d23
commit 64a8fb1888
2 changed files with 36 additions and 27 deletions

View File

@@ -1,6 +1,6 @@
package net.sf.briar.plugins.modem; package net.sf.briar.plugins.modem;
import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import java.io.IOException; import java.io.IOException;
@@ -42,24 +42,24 @@ class Sender {
a.setWindowSize(windowSize); a.setWindowSize(windowSize);
a.setChecksum(a.calculateChecksum()); a.setChecksum(a.calculateChecksum());
if(sequenceNumber == 0L) { if(sequenceNumber == 0L) {
if(LOG.isLoggable(FINE)) LOG.fine("Sending window update"); if(LOG.isLoggable(INFO)) LOG.info("Sending window update");
} else { } else {
if(LOG.isLoggable(FINE)) if(LOG.isLoggable(INFO))
LOG.fine("Acknowledging #" + sequenceNumber); LOG.info("Acknowledging #" + sequenceNumber);
} }
writeHandler.handleWrite(a.getBuffer()); writeHandler.handleWrite(a.getBuffer());
} }
void handleAck(byte[] b) { void handleAck(byte[] b) {
if(b.length != Ack.LENGTH) { if(b.length != Ack.LENGTH) {
if(LOG.isLoggable(FINE)) if(LOG.isLoggable(INFO))
LOG.fine("Ignoring ack frame with invalid length"); LOG.info("Ignoring ack frame with invalid length");
return; return;
} }
Ack a = new Ack(b); Ack a = new Ack(b);
if(a.getChecksum() != a.calculateChecksum()) { if(a.getChecksum() != a.calculateChecksum()) {
if(LOG.isLoggable(FINE)) if(LOG.isLoggable(INFO))
LOG.fine("Incorrect checksum on ack frame"); LOG.info("Incorrect checksum on ack frame");
return; return;
} }
long sequenceNumber = a.getSequenceNumber(); long sequenceNumber = a.getSequenceNumber();
@@ -72,8 +72,8 @@ class Sender {
for(int i = 0; it.hasNext(); i++) { for(int i = 0; it.hasNext(); i++) {
Outstanding o = it.next(); Outstanding o = it.next();
if(o.data.getSequenceNumber() == sequenceNumber) { if(o.data.getSequenceNumber() == sequenceNumber) {
if(LOG.isLoggable(FINE)) if(LOG.isLoggable(INFO))
LOG.fine("#" + sequenceNumber + " acknowledged"); LOG.info("#" + sequenceNumber + " acknowledged");
it.remove(); it.remove();
outstandingBytes -= o.data.getPayloadLength(); outstandingBytes -= o.data.getPayloadLength();
foundIndex = i; foundIndex = i;
@@ -86,8 +86,8 @@ class Sender {
timeout = rtt + (rttVar << 2); timeout = rtt + (rttVar << 2);
if(timeout < MIN_TIMEOUT) timeout = MIN_TIMEOUT; if(timeout < MIN_TIMEOUT) timeout = MIN_TIMEOUT;
else if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT; else if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT;
if(LOG.isLoggable(FINE)) if(LOG.isLoggable(INFO))
LOG.fine("RTT " + rtt + ", timeout " + timeout); LOG.info("RTT " + rtt + ", timeout " + timeout);
} }
break; break;
} }
@@ -95,8 +95,8 @@ class Sender {
// If any older data frames are outstanding, retransmit the oldest // If any older data frames are outstanding, retransmit the oldest
if(foundIndex > 0) { if(foundIndex > 0) {
fastRetransmit = outstanding.poll(); fastRetransmit = outstanding.poll();
if(LOG.isLoggable(FINE)) { if(LOG.isLoggable(INFO)) {
LOG.fine("Fast retransmitting #" LOG.info("Fast retransmitting #"
+ fastRetransmit.data.getSequenceNumber()); + fastRetransmit.data.getSequenceNumber());
} }
fastRetransmit.lastTransmitted = now; fastRetransmit.lastTransmitted = now;
@@ -108,7 +108,7 @@ class Sender {
int oldWindowSize = windowSize; int oldWindowSize = windowSize;
// Don't accept an unreasonably large window size // Don't accept an unreasonably large window size
windowSize = Math.min(a.getWindowSize(), Receiver.MAX_WINDOW_SIZE); windowSize = Math.min(a.getWindowSize(), Receiver.MAX_WINDOW_SIZE);
if(LOG.isLoggable(FINE)) LOG.fine("Window at sender " + windowSize); if(LOG.isLoggable(INFO)) LOG.info("Window at sender " + windowSize);
// If space has become available, notify any waiting writers // If space has become available, notify any waiting writers
if(windowSize > oldWindowSize || foundIndex != -1) notifyAll(); if(windowSize > oldWindowSize || foundIndex != -1) notifyAll();
} }
@@ -131,20 +131,20 @@ class Sender {
synchronized(this) { synchronized(this) {
if(outstanding.isEmpty()) { if(outstanding.isEmpty()) {
if(dataWaiting && now - lastWindowUpdateOrProbe > timeout) { if(dataWaiting && now - lastWindowUpdateOrProbe > timeout) {
if(LOG.isLoggable(FINE)) LOG.fine("Sending window probe"); if(LOG.isLoggable(INFO)) LOG.info("Sending window probe");
sendProbe = true; sendProbe = true;
timeout <<= 1; timeout <<= 1;
if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT; if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT;
if(LOG.isLoggable(FINE)) if(LOG.isLoggable(INFO))
LOG.fine("Increasing timeout to " + timeout); LOG.info("Increasing timeout to " + timeout);
} }
} else { } else {
Iterator<Outstanding> it = outstanding.iterator(); Iterator<Outstanding> it = outstanding.iterator();
while(it.hasNext()) { while(it.hasNext()) {
Outstanding o = it.next(); Outstanding o = it.next();
if(now - o.lastTransmitted > timeout) { if(now - o.lastTransmitted > timeout) {
if(LOG.isLoggable(FINE)) { if(LOG.isLoggable(INFO)) {
LOG.fine("Retransmitting #" LOG.info("Retransmitting #"
+ o.data.getSequenceNumber()); + o.data.getSequenceNumber());
} }
it.remove(); it.remove();
@@ -153,8 +153,8 @@ class Sender {
retransmit.add(o); retransmit.add(o);
timeout <<= 1; timeout <<= 1;
if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT; if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT;
if(LOG.isLoggable(FINE)) if(LOG.isLoggable(INFO))
LOG.fine("Increasing timeout to " + timeout); LOG.info("Increasing timeout to " + timeout);
} }
} }
if(retransmit != null) { if(retransmit != null) {
@@ -190,8 +190,8 @@ class Sender {
int payloadLength = d.getPayloadLength(); int payloadLength = d.getPayloadLength();
synchronized(this) { synchronized(this) {
while(outstandingBytes + payloadLength >= windowSize) { while(outstandingBytes + payloadLength >= windowSize) {
if(LOG.isLoggable(FINE)) if(LOG.isLoggable(INFO))
LOG.fine("Waiting for space in the window"); LOG.info("Waiting for space in the window");
dataWaiting = true; dataWaiting = true;
wait(); wait();
} }
@@ -199,11 +199,15 @@ class Sender {
outstandingBytes += payloadLength; outstandingBytes += payloadLength;
dataWaiting = false; dataWaiting = false;
} }
if(LOG.isLoggable(FINE)) if(LOG.isLoggable(INFO))
LOG.fine("Transmitting #" + d.getSequenceNumber()); LOG.info("Transmitting #" + d.getSequenceNumber());
writeHandler.handleWrite(d.getBuffer()); writeHandler.handleWrite(d.getBuffer());
} }
synchronized void flush() throws IOException, InterruptedException {
while(dataWaiting || !outstanding.isEmpty()) wait();
}
private static class Outstanding { private static class Outstanding {
private final Data data; private final Data data;

View File

@@ -23,7 +23,12 @@ class SenderOutputStream extends OutputStream {
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
if(offset > Data.HEADER_LENGTH) send(false); if(offset > Data.HEADER_LENGTH) send(false);
// FIXME: Wait for asynchronous writes to complete try {
sender.flush();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while flushing");
}
} }
@Override @Override