SLTP reads and writes should time out eventually.

This commit is contained in:
akwizgran
2012-12-15 04:30:43 +00:00
parent 15c23f486e
commit 61e59f816d
4 changed files with 40 additions and 48 deletions

View File

@@ -4,7 +4,7 @@ import net.sf.briar.util.ByteUtils;
class Ack extends Frame { class Ack extends Frame {
static final int LENGTH = 12; static final int LENGTH = 11;
Ack() { Ack() {
super(new byte[LENGTH]); super(new byte[LENGTH]);
@@ -18,10 +18,10 @@ class Ack extends Frame {
} }
int getWindowSize() { int getWindowSize() {
return ByteUtils.readUint24(buf, 5); return ByteUtils.readUint16(buf, 5);
} }
void setWindowSize(int windowSize) { void setWindowSize(int windowSize) {
ByteUtils.writeUint24(windowSize, buf, 5); ByteUtils.writeUint16(windowSize, buf, 5);
} }
} }

View File

@@ -10,7 +10,8 @@ import net.sf.briar.api.reliability.ReadHandler;
class Receiver implements ReadHandler { class Receiver implements ReadHandler {
static final int MAX_WINDOW_SIZE = 8 * Data.MAX_PAYLOAD_LENGTH; private static final int READ_TIMEOUT = 5 * 60 * 1000; // Milliseconds
private static final int MAX_WINDOW_SIZE = 8 * Data.MAX_PAYLOAD_LENGTH;
private final Sender sender; private final Sender sender;
private final SortedSet<Data> dataFrames; // Locking: this private final SortedSet<Data> dataFrames; // Locking: this
@@ -27,10 +28,11 @@ class Receiver implements ReadHandler {
} }
synchronized Data read() throws IOException, InterruptedException { synchronized Data read() throws IOException, InterruptedException {
while(valid) { long now = System.currentTimeMillis(), end = now + READ_TIMEOUT;
while(now < end && valid) {
if(dataFrames.isEmpty()) { if(dataFrames.isEmpty()) {
// Wait for a data frame // Wait for a data frame
wait(); wait(end - now);
} else { } else {
Data d = dataFrames.first(); Data d = dataFrames.first();
if(d.getSequenceNumber() == nextSequenceNumber) { if(d.getSequenceNumber() == nextSequenceNumber) {
@@ -42,10 +44,12 @@ class Receiver implements ReadHandler {
return d; return d;
} else { } else {
// Wait for the next in-order data frame // Wait for the next in-order data frame
wait(); wait(end - now);
} }
} }
now = System.currentTimeMillis();
} }
if(valid) throw new IOException("Read timed out");
throw new IOException("Connection closed"); throw new IOException("Connection closed");
} }

View File

@@ -11,20 +11,23 @@ import net.sf.briar.api.reliability.WriteHandler;
class Sender { class Sender {
// All times are in milliseconds // All times are in milliseconds
private static final int MIN_TIMEOUT = 1000; private static final int WRITE_TIMEOUT = 5 * 60 * 1000;
private static final int MAX_TIMEOUT = 60 * 1000; private static final int MIN_RTO = 1000;
private static final int MAX_RTO = 60 * 1000;
private static final int INITIAL_RTT = 0; private static final int INITIAL_RTT = 0;
private static final int INITIAL_RTT_VAR = 3 * 1000; private static final int INITIAL_RTT_VAR = 3 * 1000;
private static final int MAX_WINDOW_SIZE = 64 * Data.MAX_PAYLOAD_LENGTH;
private final WriteHandler writeHandler; private final WriteHandler writeHandler;
private final LinkedList<Outstanding> outstanding; // Locking: this private final LinkedList<Outstanding> outstanding; // Locking: this
private int outstandingBytes = 0; // Locking: this // All of the following are locking: this
private int windowSize = Data.MAX_PAYLOAD_LENGTH; // Locking: this private int outstandingBytes = 0;
private int rtt = INITIAL_RTT, rttVar = INITIAL_RTT_VAR; // Locking: this private int windowSize = Data.MAX_PAYLOAD_LENGTH;
private int timeout = rtt + (rttVar << 2); // Locking: this private int rtt = INITIAL_RTT, rttVar = INITIAL_RTT_VAR;
private long lastWindowUpdateOrProbe = Long.MAX_VALUE; // Locking: this private int rto = rtt + (rttVar << 2);
private boolean dataWaiting = false; // Locking: this private long lastWindowUpdateOrProbe = Long.MAX_VALUE;
private boolean dataWaiting = false;
Sender(WriteHandler writeHandler) { Sender(WriteHandler writeHandler) {
this.writeHandler = writeHandler; this.writeHandler = writeHandler;
@@ -62,15 +65,15 @@ class Sender {
it.remove(); it.remove();
outstandingBytes -= o.data.getPayloadLength(); outstandingBytes -= o.data.getPayloadLength();
foundIndex = i; foundIndex = i;
// Update the round-trip time and retransmission timer // Update the round-trip time and retransmission timeout
if(!o.retransmitted) { if(!o.retransmitted) {
int sample = (int) (now - o.lastTransmitted); int sample = (int) (now - o.lastTransmitted);
int error = sample - rtt; int error = sample - rtt;
rtt += (error >> 3); rtt += (error >> 3);
rttVar += (Math.abs(error) - rttVar) >> 2; rttVar += (Math.abs(error) - rttVar) >> 2;
timeout = rtt + (rttVar << 2); rto = rtt + (rttVar << 2);
if(timeout < MIN_TIMEOUT) timeout = MIN_TIMEOUT; if(rto < MIN_RTO) rto = MIN_RTO;
else if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT; else if(rto > MAX_RTO) rto = MAX_RTO;
} }
break; break;
} }
@@ -86,7 +89,7 @@ class Sender {
lastWindowUpdateOrProbe = now; lastWindowUpdateOrProbe = now;
int oldWindowSize = windowSize; int oldWindowSize = windowSize;
// Don't accept an unreasonably large window size // Don't accept an unreasonably large window size
windowSize = Math.min(a.getWindowSize(), Receiver.MAX_WINDOW_SIZE); windowSize = Math.min(a.getWindowSize(), MAX_WINDOW_SIZE);
// If space has become available, notify any waiting writers // If space has become available, notify any waiting writers
if(windowSize > oldWindowSize || foundIndex != -1) notifyAll(); if(windowSize > oldWindowSize || foundIndex != -1) notifyAll();
} }
@@ -101,22 +104,23 @@ class Sender {
boolean sendProbe = false; boolean sendProbe = false;
synchronized(this) { synchronized(this) {
if(outstanding.isEmpty()) { if(outstanding.isEmpty()) {
if(dataWaiting && now - lastWindowUpdateOrProbe > timeout) { if(dataWaiting && now - lastWindowUpdateOrProbe > rto) {
sendProbe = true; sendProbe = true;
timeout <<= 1; rto <<= 1;
if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT; if(rto > MAX_RTO) rto = MAX_RTO;
} }
} else { } else {
Iterator<Outstanding> it = outstanding.iterator(); Iterator<Outstanding> it = outstanding.iterator();
while(it.hasNext()) { while(it.hasNext()) {
Outstanding o = it.next(); Outstanding o = it.next();
if(now - o.lastTransmitted > timeout) { if(now - o.lastTransmitted > rto) {
it.remove(); it.remove();
if(retransmit == null) if(retransmit == null)
retransmit = new ArrayList<Outstanding>(); retransmit = new ArrayList<Outstanding>();
retransmit.add(o); retransmit.add(o);
timeout <<= 1; // Update the retransmission timeout
if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT; rto <<= 1;
if(rto > MAX_RTO) rto = MAX_RTO;
} }
} }
if(retransmit != null) { if(retransmit != null) {
@@ -146,10 +150,14 @@ class Sender {
int payloadLength = d.getPayloadLength(); int payloadLength = d.getPayloadLength();
synchronized(this) { synchronized(this) {
// Wait for space in the window // Wait for space in the window
while(outstandingBytes + payloadLength >= windowSize) { long now = System.currentTimeMillis(), end = now + WRITE_TIMEOUT;
while(now < end && outstandingBytes + payloadLength >= windowSize) {
dataWaiting = true; dataWaiting = true;
wait(); wait(end - now);
now = System.currentTimeMillis();
} }
if(outstandingBytes + payloadLength >= windowSize)
throw new IOException("Write timed out");
outstanding.add(new Outstanding(d)); outstanding.add(new Outstanding(d));
outstandingBytes += payloadLength; outstandingBytes += payloadLength;
dataWaiting = false; dataWaiting = false;

View File

@@ -7,11 +7,6 @@ public class ByteUtils {
*/ */
public static final int MAX_16_BIT_UNSIGNED = 65535; // 2^16 - 1 public static final int MAX_16_BIT_UNSIGNED = 65535; // 2^16 - 1
/**
* The maximum value that can be represented as an unsigned 24-bit integer.
*/
public static final int MAX_24_BIT_UNSIGNED = 16777215; // 2^24 - 1
/** /**
* The maximum value that can be represented as an unsigned 32-bit integer. * The maximum value that can be represented as an unsigned 32-bit integer.
*/ */
@@ -32,15 +27,6 @@ public class ByteUtils {
b[offset + 1] = (byte) (i & 0xFF); b[offset + 1] = (byte) (i & 0xFF);
} }
public static void writeUint24(long i, byte[] b, int offset) {
if(i < 0L) throw new IllegalArgumentException();
if(i > MAX_24_BIT_UNSIGNED) throw new IllegalArgumentException();
if(b.length < offset + 3) throw new IllegalArgumentException();
b[offset] = (byte) (i >> 16);
b[offset + 1] = (byte) (i >> 8 & 0xFF);
b[offset + 2] = (byte) (i & 0xFF);
}
public static void writeUint32(long i, byte[] b, int offset) { public static void writeUint32(long i, byte[] b, int offset) {
if(i < 0L) throw new IllegalArgumentException(); if(i < 0L) throw new IllegalArgumentException();
if(i > MAX_32_BIT_UNSIGNED) throw new IllegalArgumentException(); if(i > MAX_32_BIT_UNSIGNED) throw new IllegalArgumentException();
@@ -56,12 +42,6 @@ public class ByteUtils {
return ((b[offset] & 0xFF) << 8) | (b[offset + 1] & 0xFF); return ((b[offset] & 0xFF) << 8) | (b[offset + 1] & 0xFF);
} }
public static int readUint24(byte[] b, int offset) {
if(b.length < offset + 3) throw new IllegalArgumentException();
return ((b[offset] & 0xFF) << 16) | ((b[offset + 1] & 0xFF) << 8)
| (b[offset + 2] & 0xFF);
}
public static long readUint32(byte[] b, int offset) { public static long readUint32(byte[] b, int offset) {
if(b.length < offset + 4) throw new IllegalArgumentException(); if(b.length < offset + 4) throw new IllegalArgumentException();
return ((b[offset] & 0xFFL) << 24) | ((b[offset + 1] & 0xFFL) << 16) return ((b[offset] & 0xFFL) << 24) | ((b[offset + 1] & 0xFFL) << 16)