Reliability layer must call Sender.tick() periodically.

This commit is contained in:
akwizgran
2012-12-14 20:44:28 +00:00
parent 1bc3d52957
commit 1e1a226f30
2 changed files with 31 additions and 30 deletions

View File

@@ -1,5 +1,6 @@
package net.sf.briar.plugins.modem; package net.sf.briar.plugins.modem;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import java.io.IOException; import java.io.IOException;
@@ -11,6 +12,8 @@ import java.util.logging.Logger;
class ReliabilityLayer implements ReadHandler, WriteHandler { class ReliabilityLayer implements ReadHandler, WriteHandler {
private static final int TICK_INTERVAL = 500; // Milliseconds
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(ReliabilityLayer.class.getName()); Logger.getLogger(ReliabilityLayer.class.getName());
@@ -31,7 +34,7 @@ class ReliabilityLayer implements ReadHandler, WriteHandler {
void start() { void start() {
SlipEncoder encoder = new SlipEncoder(this); SlipEncoder encoder = new SlipEncoder(this);
Sender sender = new Sender(encoder); final Sender sender = new Sender(encoder);
receiver = new Receiver(sender); receiver = new Receiver(sender);
decoder = new SlipDecoder(receiver); decoder = new SlipDecoder(receiver);
inputStream = new ReceiverInputStream(receiver); inputStream = new ReceiverInputStream(receiver);
@@ -39,11 +42,22 @@ class ReliabilityLayer implements ReadHandler, WriteHandler {
writer = new Thread("ReliabilityLayer") { writer = new Thread("ReliabilityLayer") {
@Override @Override
public void run() { public void run() {
long now = System.currentTimeMillis();
long next = now + TICK_INTERVAL;
try { try {
while(running) { while(running) {
byte[] b = writes.take(); byte[] b = null;
if(b.length == 0) return; // Poison pill while(now < next && b == null) {
writeHandler.handleWrite(b); b = writes.poll(next - now, MILLISECONDS);
now = System.currentTimeMillis();
}
if(b == null) {
sender.tick();
while(next <= now) next += TICK_INTERVAL;
} else {
if(b.length == 0) return; // Poison pill
writeHandler.handleWrite(b);
}
} }
} catch(InterruptedException e) { } catch(InterruptedException e) {
if(LOG.isLoggable(WARNING)) if(LOG.isLoggable(WARNING))
@@ -52,7 +66,7 @@ class ReliabilityLayer implements ReadHandler, WriteHandler {
running = false; running = false;
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(WARNING)) if(LOG.isLoggable(WARNING))
LOG.warning("Interrupted while writing"); LOG.log(WARNING, e.toString(), e);
running = false; running = false;
} }
} }
@@ -70,7 +84,6 @@ class ReliabilityLayer implements ReadHandler, WriteHandler {
} }
void stop() { void stop() {
if(!running) throw new IllegalStateException();
running = false; running = false;
receiver.invalidate(); receiver.invalidate();
writes.add(new byte[0]); // Poison pill writes.add(new byte[0]); // Poison pill

View File

@@ -1,19 +1,13 @@
package net.sf.briar.plugins.modem; package net.sf.briar.plugins.modem;
import static java.util.logging.Level.WARNING;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.logging.Logger;
class Sender { class Sender {
private static final Logger LOG =
Logger.getLogger(Sender.class.getName());
// All times are in milliseconds // All times are in milliseconds
private static final int MIN_TIMEOUT = 1000; private static final int MIN_TIMEOUT = 1000;
private static final int MAX_TIMEOUT = 60 * 1000; private static final int MAX_TIMEOUT = 60 * 1000;
@@ -99,7 +93,7 @@ class Sender {
writeHandler.handleWrite(fastRetransmit.data.getBuffer()); writeHandler.handleWrite(fastRetransmit.data.getBuffer());
} }
void tick() { void tick() throws IOException {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
List<Outstanding> retransmit = null; List<Outstanding> retransmit = null;
boolean sendProbe = false; boolean sendProbe = false;
@@ -132,23 +126,17 @@ class Sender {
} }
} }
} }
try { // Send a window probe if necessary
// Send a window probe if necessary if(sendProbe) {
if(sendProbe) { byte[] buf = new byte[Data.MIN_LENGTH];
byte[] buf = new byte[Data.MIN_LENGTH]; Data probe = new Data(buf);
Data probe = new Data(buf); probe.setChecksum(probe.calculateChecksum());
probe.setChecksum(probe.calculateChecksum()); writeHandler.handleWrite(buf);
writeHandler.handleWrite(buf); }
} // Retransmit any lost data frames
// Retransmit any lost data frames if(retransmit != null) {
if(retransmit != null) { for(Outstanding o : retransmit)
for(Outstanding o : retransmit) writeHandler.handleWrite(o.data.getBuffer());
writeHandler.handleWrite(o.data.getBuffer());
}
} catch(IOException e) {
// FIXME: Do something more meaningful
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return;
} }
} }