Comments to indicate which locks guard which variables.

This commit is contained in:
akwizgran
2015-01-29 11:05:46 +00:00
parent 47bd84122e
commit 0dbfd7073f
21 changed files with 235 additions and 1092 deletions

View File

@@ -21,15 +21,17 @@ class Receiver implements ReadHandler {
private final Clock clock;
private final Sender sender;
private final SortedSet<Data> dataFrames;
private final Lock windowLock = new ReentrantLock();
private final Condition dataFrameAvailable = windowLock.newCondition();
// The following are locking: windowLock
private final SortedSet<Data> dataFrames;
private int windowSize = MAX_WINDOW_SIZE;
private long finalSequenceNumber = Long.MAX_VALUE;
private long nextSequenceNumber = 1;
private volatile boolean valid = true;
private Lock synchLock = new ReentrantLock();
private Condition dataFrameAvailable = synchLock.newCondition();
Receiver(Clock clock, Sender sender) {
this.sender = sender;
@@ -38,7 +40,7 @@ class Receiver implements ReadHandler {
}
Data read() throws IOException, InterruptedException {
synchLock.lock();
windowLock.lock();
try {
long now = clock.currentTimeMillis(), end = now + READ_TIMEOUT;
while(now < end && valid) {
@@ -64,17 +66,17 @@ class Receiver implements ReadHandler {
if(valid) throw new IOException("Read timed out");
throw new IOException("Connection closed");
} finally {
synchLock.unlock();
windowLock.unlock();
}
}
void invalidate() {
valid = false;
synchLock.lock();
windowLock.lock();
try {
dataFrameAvailable.signalAll();
} finally {
synchLock.unlock();
windowLock.unlock();
}
}
@@ -95,7 +97,7 @@ class Receiver implements ReadHandler {
}
private void handleData(byte[] b) throws IOException {
synchLock.lock();
windowLock.lock();
try {
if(b.length < Data.MIN_LENGTH || b.length > Data.MAX_LENGTH) {
// Ignore data frame with invalid length
@@ -134,7 +136,7 @@ class Receiver implements ReadHandler {
// Acknowledge the data frame even if it's a duplicate
sender.sendAck(sequenceNumber, windowSize);
} finally {
synchLock.unlock();
windowLock.unlock();
}
}

View File

@@ -26,8 +26,11 @@ class Sender {
private final Clock clock;
private final WriteHandler writeHandler;
private final LinkedList<Outstanding> outstanding;
private final Lock windowLock = new ReentrantLock();
private final Condition sendWindowAvailable = windowLock.newCondition();
// The following are locking: windowLock
private final LinkedList<Outstanding> outstanding;
private int outstandingBytes = 0;
private int windowSize = Data.MAX_PAYLOAD_LENGTH;
private int rtt = INITIAL_RTT, rttVar = INITIAL_RTT_VAR;
@@ -35,9 +38,6 @@ class Sender {
private long lastWindowUpdateOrProbe = Long.MAX_VALUE;
private boolean dataWaiting = false;
private Lock synchLock = new ReentrantLock();
private Condition sendWindowAvailable = synchLock.newCondition();
Sender(Clock clock, WriteHandler writeHandler) {
this.clock = clock;
this.writeHandler = writeHandler;
@@ -65,7 +65,7 @@ class Sender {
long sequenceNumber = a.getSequenceNumber();
long now = clock.currentTimeMillis();
Outstanding fastRetransmit = null;
synchLock.lock();
windowLock.lock();
try {
// Remove the acked data frame if it's outstanding
int foundIndex = -1;
@@ -105,7 +105,7 @@ class Sender {
if(windowSize > oldWindowSize || foundIndex != -1)
sendWindowAvailable.signalAll();
} finally {
synchLock.unlock();
windowLock.unlock();
}
// Fast retransmission
if(fastRetransmit != null)
@@ -116,7 +116,7 @@ class Sender {
long now = clock.currentTimeMillis();
List<Outstanding> retransmit = null;
boolean sendProbe = false;
synchLock.lock();
windowLock.lock();
try {
if(outstanding.isEmpty()) {
if(dataWaiting && now - lastWindowUpdateOrProbe > rto) {
@@ -147,7 +147,7 @@ class Sender {
}
}
} finally {
synchLock.unlock();
windowLock.unlock();
}
// Send a window probe if necessary
if(sendProbe) {
@@ -165,7 +165,7 @@ class Sender {
void write(Data d) throws IOException, InterruptedException {
int payloadLength = d.getPayloadLength();
synchLock.lock();
windowLock.lock();
try {
// Wait for space in the window
long now = clock.currentTimeMillis(), end = now + WRITE_TIMEOUT;
@@ -180,18 +180,18 @@ class Sender {
outstandingBytes += payloadLength;
dataWaiting = false;
} finally {
synchLock.unlock();
windowLock.unlock();
}
writeHandler.handleWrite(d.getBuffer());
}
void flush() throws IOException, InterruptedException {
synchLock.lock();
windowLock.lock();
try {
while(dataWaiting || !outstanding.isEmpty())
sendWindowAvailable.await();
} finally {
synchLock.unlock();
windowLock.unlock();
}
}