Improved encapsulation of thread synchronisation as follows

- replaced use of Object instance mutex with a private final Lock object
- replaced Object signaling with specific condition signalling
This commit is contained in:
Abraham Kiggundu
2014-12-23 23:55:56 +03:00
parent 276dcb1038
commit b074978472
19 changed files with 1001 additions and 478 deletions

View File

@@ -5,6 +5,10 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.briarproject.api.reliability.ReadHandler;
import org.briarproject.api.system.Clock;
@@ -23,6 +27,8 @@ class Receiver implements ReadHandler {
private long nextSequenceNumber = 1;
private volatile boolean valid = true;
private Lock synchLock = new ReentrantLock();
private Condition dataFrameAvailable = synchLock.newCondition();
Receiver(Clock clock, Sender sender) {
this.sender = sender;
@@ -30,36 +36,46 @@ class Receiver implements ReadHandler {
dataFrames = new TreeSet<Data>(new SequenceNumberComparator());
}
synchronized Data read() throws IOException, InterruptedException {
long now = clock.currentTimeMillis(), end = now + READ_TIMEOUT;
while(now < end && valid) {
if(dataFrames.isEmpty()) {
// Wait for a data frame
wait(end - now);
} else {
Data d = dataFrames.first();
if(d.getSequenceNumber() == nextSequenceNumber) {
dataFrames.remove(d);
// Update the window
windowSize += d.getPayloadLength();
sender.sendAck(0, windowSize);
nextSequenceNumber++;
return d;
Data read() throws IOException, InterruptedException {
synchLock.lock();
try{
long now = clock.currentTimeMillis(), end = now + READ_TIMEOUT;
while(now < end && valid) {
if(dataFrames.isEmpty()) {
// Wait for a data frame
dataFrameAvailable.await(end - now, TimeUnit.MILLISECONDS);
} else {
// Wait for the next in-order data frame
wait(end - now);
Data d = dataFrames.first();
if(d.getSequenceNumber() == nextSequenceNumber) {
dataFrames.remove(d);
// Update the window
windowSize += d.getPayloadLength();
sender.sendAck(0, windowSize);
nextSequenceNumber++;
return d;
} else {
// Wait for the next in-order data frame
dataFrameAvailable.await(end - now, TimeUnit.MILLISECONDS);
}
}
now = clock.currentTimeMillis();
}
now = clock.currentTimeMillis();
if(valid) throw new IOException("Read timed out");
throw new IOException("Connection closed");
}
finally{
synchLock.unlock();
}
if(valid) throw new IOException("Read timed out");
throw new IOException("Connection closed");
}
void invalidate() {
valid = false;
synchronized(this) {
notifyAll();
synchLock.lock();
try {
dataFrameAvailable.signalAll();
}
finally{
synchLock.unlock();
}
}
@@ -79,43 +95,49 @@ class Receiver implements ReadHandler {
}
}
private synchronized void handleData(byte[] b) throws IOException {
if(b.length < Data.MIN_LENGTH || b.length > Data.MAX_LENGTH) {
// Ignore data frame with invalid length
return;
}
Data d = new Data(b);
int payloadLength = d.getPayloadLength();
if(payloadLength > windowSize) return; // No space in the window
if(d.getChecksum() != d.calculateChecksum()) {
// Ignore data frame with invalid checksum
return;
}
long sequenceNumber = d.getSequenceNumber();
if(sequenceNumber == 0) {
// Window probe
} else if(sequenceNumber < nextSequenceNumber) {
// Duplicate data frame
} else if(d.isLastFrame()) {
finalSequenceNumber = sequenceNumber;
// Remove any data frames with higher sequence numbers
Iterator<Data> it = dataFrames.iterator();
while(it.hasNext()) {
Data d1 = it.next();
if(d1.getSequenceNumber() >= finalSequenceNumber) it.remove();
private void handleData(byte[] b) throws IOException {
synchLock.lock();
try{
if(b.length < Data.MIN_LENGTH || b.length > Data.MAX_LENGTH) {
// Ignore data frame with invalid length
return;
}
if(dataFrames.add(d)) {
windowSize -= payloadLength;
notifyAll();
Data d = new Data(b);
int payloadLength = d.getPayloadLength();
if(payloadLength > windowSize) return; // No space in the window
if(d.getChecksum() != d.calculateChecksum()) {
// Ignore data frame with invalid checksum
return;
}
} else if(sequenceNumber < finalSequenceNumber) {
if(dataFrames.add(d)) {
windowSize -= payloadLength;
notifyAll();
long sequenceNumber = d.getSequenceNumber();
if(sequenceNumber == 0) {
// Window probe
} else if(sequenceNumber < nextSequenceNumber) {
// Duplicate data frame
} else if(d.isLastFrame()) {
finalSequenceNumber = sequenceNumber;
// Remove any data frames with higher sequence numbers
Iterator<Data> it = dataFrames.iterator();
while(it.hasNext()) {
Data d1 = it.next();
if(d1.getSequenceNumber() >= finalSequenceNumber) it.remove();
}
if(dataFrames.add(d)) {
windowSize -= payloadLength;
dataFrameAvailable.signalAll();
}
} else if(sequenceNumber < finalSequenceNumber) {
if(dataFrames.add(d)) {
windowSize -= payloadLength;
dataFrameAvailable.signalAll();
}
}
// Acknowledge the data frame even if it's a duplicate
sender.sendAck(sequenceNumber, windowSize);
}
finally{
synchLock.unlock();
}
// Acknowledge the data frame even if it's a duplicate
sender.sendAck(sequenceNumber, windowSize);
}
private static class SequenceNumberComparator implements Comparator<Data> {

View File

@@ -5,6 +5,10 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.briarproject.api.reliability.WriteHandler;
import org.briarproject.api.system.Clock;
@@ -31,6 +35,9 @@ class Sender {
private long lastWindowUpdateOrProbe = Long.MAX_VALUE;
private boolean dataWaiting = false;
private Lock synchLock = new ReentrantLock();
private Condition sendWindowAvailable = synchLock.newCondition();
Sender(Clock clock, WriteHandler writeHandler) {
this.clock = clock;
this.writeHandler = writeHandler;
@@ -58,7 +65,8 @@ class Sender {
long sequenceNumber = a.getSequenceNumber();
long now = clock.currentTimeMillis();
Outstanding fastRetransmit = null;
synchronized(this) {
synchLock.lock();
try {
// Remove the acked data frame if it's outstanding
int foundIndex = -1;
Iterator<Outstanding> it = outstanding.iterator();
@@ -96,6 +104,9 @@ class Sender {
// If space has become available, notify any waiting writers
if(windowSize > oldWindowSize || foundIndex != -1) notifyAll();
}
finally{
synchLock.unlock();
}
// Fast retransmission
if(fastRetransmit != null)
writeHandler.handleWrite(fastRetransmit.data.getBuffer());
@@ -105,7 +116,8 @@ class Sender {
long now = clock.currentTimeMillis();
List<Outstanding> retransmit = null;
boolean sendProbe = false;
synchronized(this) {
synchLock.lock();
try {
if(outstanding.isEmpty()) {
if(dataWaiting && now - lastWindowUpdateOrProbe > rto) {
sendProbe = true;
@@ -135,6 +147,9 @@ class Sender {
}
}
}
finally{
synchLock.unlock();
}
// Send a window probe if necessary
if(sendProbe) {
byte[] buf = new byte[Data.MIN_LENGTH];
@@ -151,12 +166,13 @@ class Sender {
void write(Data d) throws IOException, InterruptedException {
int payloadLength = d.getPayloadLength();
synchronized(this) {
synchLock.lock();
try {
// Wait for space in the window
long now = clock.currentTimeMillis(), end = now + WRITE_TIMEOUT;
while(now < end && outstandingBytes + payloadLength >= windowSize) {
dataWaiting = true;
wait(end - now);
sendWindowAvailable.await(end - now, TimeUnit.MILLISECONDS);
now = clock.currentTimeMillis();
}
if(outstandingBytes + payloadLength >= windowSize)
@@ -165,11 +181,20 @@ class Sender {
outstandingBytes += payloadLength;
dataWaiting = false;
}
finally{
synchLock.unlock();
}
writeHandler.handleWrite(d.getBuffer());
}
synchronized void flush() throws IOException, InterruptedException {
while(dataWaiting || !outstanding.isEmpty()) wait();
void flush() throws IOException, InterruptedException {
synchLock.lock();
try{
while(dataWaiting || !outstanding.isEmpty()) sendWindowAvailable.await();
}
finally{
synchLock.unlock();
}
}
private static class Outstanding {