Incoming reliability layer with support for reordering (untested).

This commit is contained in:
akwizgran
2012-01-21 18:48:24 +00:00
parent 48ceaaea2a
commit 9f1e3dea21
8 changed files with 103 additions and 26 deletions

View File

@@ -55,8 +55,7 @@ class ConnectionReaderFactoryImpl implements ConnectionReaderFactory {
new IncomingAuthenticationLayerImpl(correction, mac, macKey); new IncomingAuthenticationLayerImpl(correction, mac, macKey);
// No reordering or retransmission // No reordering or retransmission
IncomingReliabilityLayer reliability = IncomingReliabilityLayer reliability =
new IncomingReliabilityLayerImpl(authentication, new NullIncomingReliabilityLayer(authentication);
new NullFrameWindow());
// Create the reader - don't tolerate errors // Create the reader - don't tolerate errors
return new ConnectionReaderImpl(reliability, false); return new ConnectionReaderImpl(reliability, false);
} }
@@ -93,8 +92,7 @@ class ConnectionReaderFactoryImpl implements ConnectionReaderFactory {
new IncomingAuthenticationLayerImpl(correction, mac, macKey); new IncomingAuthenticationLayerImpl(correction, mac, macKey);
// No reordering or retransmission // No reordering or retransmission
IncomingReliabilityLayer reliability = IncomingReliabilityLayer reliability =
new IncomingReliabilityLayerImpl(authentication, new NullIncomingReliabilityLayer(authentication);
new NullFrameWindow());
// Create the reader - don't tolerate errors // Create the reader - don't tolerate errors
return new ConnectionReaderImpl(reliability, false); return new ConnectionReaderImpl(reliability, false);
} }

View File

@@ -12,14 +12,13 @@ class ConnectionReaderImpl extends InputStream implements ConnectionReader {
private final IncomingReliabilityLayer in; private final IncomingReliabilityLayer in;
private final boolean tolerateErrors; private final boolean tolerateErrors;
private final Frame frame;
private Frame frame;
private int offset = 0, length = 0; private int offset = 0, length = 0;
ConnectionReaderImpl(IncomingReliabilityLayer in, boolean tolerateErrors) { ConnectionReaderImpl(IncomingReliabilityLayer in, boolean tolerateErrors) {
this.in = in; this.in = in;
this.tolerateErrors = tolerateErrors; this.tolerateErrors = tolerateErrors;
frame = new Frame(in.getMaxFrameLength());
} }
public InputStream getInputStream() { public InputStream getInputStream() {
@@ -28,7 +27,8 @@ class ConnectionReaderImpl extends InputStream implements ConnectionReader {
@Override @Override
public int read() throws IOException { public int read() throws IOException {
while(length == 0) if(!readValidFrame()) return -1; if(length == -1) return -1;
while(length == 0) if(!readFrame()) return -1;
int b = frame.getBuffer()[offset] & 0xff; int b = frame.getBuffer()[offset] & 0xff;
offset++; offset++;
length--; length--;
@@ -42,7 +42,8 @@ class ConnectionReaderImpl extends InputStream implements ConnectionReader {
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b, int off, int len) throws IOException {
while(length == 0) if(!readValidFrame()) return -1; if(length == -1) return -1;
while(length == 0) if(!readFrame()) return -1;
len = Math.min(len, length); len = Math.min(len, length);
System.arraycopy(frame.getBuffer(), offset, b, off, len); System.arraycopy(frame.getBuffer(), offset, b, off, len);
offset += len; offset += len;
@@ -50,11 +51,15 @@ class ConnectionReaderImpl extends InputStream implements ConnectionReader {
return len; return len;
} }
private boolean readValidFrame() throws IOException { private boolean readFrame() throws IOException {
assert length == 0; assert length == 0;
while(true) { while(true) {
try { try {
if(!in.readFrame(frame)) return false; frame = in.readFrame(frame);
if(frame == null) {
length = -1;
return false;
}
offset = FRAME_HEADER_LENGTH; offset = FRAME_HEADER_LENGTH;
length = HeaderEncoder.getPayloadLength(frame.getBuffer()); length = HeaderEncoder.getPayloadLength(frame.getBuffer());
return true; return true;

View File

@@ -26,6 +26,11 @@ class Frame {
return buf; return buf;
} }
public long getFrameNumber() {
if(length == -1) throw new IllegalStateException();
return HeaderEncoder.getFrameNumber(buf);
}
public int getLength() { public int getLength() {
if(length == -1) throw new IllegalStateException(); if(length == -1) throw new IllegalStateException();
return length; return length;

View File

@@ -5,14 +5,14 @@ import java.io.IOException;
interface IncomingReliabilityLayer { interface IncomingReliabilityLayer {
/** /**
* Reads a frame into the given buffer. Returns false if no more frames * Reads and returns a frame, possibly using the given buffer. Returns null
* can be read from the connection. * if no more frames can be read from the connection.
* @throws IOException if an unrecoverable error occurs and the connection * @throws IOException if an unrecoverable error occurs and the connection
* must be closed. * must be closed.
* @throws InvalidDataException if a recoverable error occurs. The caller * @throws InvalidDataException if a recoverable error occurs. The caller
* may choose whether to retry the read or close the connection. * may choose whether to retry the read or close the connection.
*/ */
boolean readFrame(Frame f) throws IOException, InvalidDataException; Frame readFrame(Frame f) throws IOException, InvalidDataException;
/** Returns the maximum length in bytes of the frames this layer returns. */ /** Returns the maximum length in bytes of the frames this layer returns. */
int getMaxFrameLength(); int getMaxFrameLength();

View File

@@ -1,25 +1,69 @@
package net.sf.briar.transport; package net.sf.briar.transport;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.ListIterator;
class IncomingReliabilityLayerImpl implements IncomingReliabilityLayer { class IncomingReliabilityLayerImpl implements IncomingReliabilityLayer {
private final IncomingAuthenticationLayer in; private final IncomingAuthenticationLayer in;
private final FrameWindow window;
private final int maxFrameLength; private final int maxFrameLength;
private final FrameWindow window;
private final LinkedList<Frame> frames;
private final ArrayList<Frame> freeFrames;
IncomingReliabilityLayerImpl(IncomingAuthenticationLayer in, private long nextFrameNumber = 0L;
FrameWindow window) {
IncomingReliabilityLayerImpl(IncomingAuthenticationLayer in) {
this.in = in; this.in = in;
this.window = window;
maxFrameLength = in.getMaxFrameLength(); maxFrameLength = in.getMaxFrameLength();
window = new FrameWindowImpl();
frames = new LinkedList<Frame>();
freeFrames = new ArrayList<Frame>();
} }
public boolean readFrame(Frame f) throws IOException, InvalidDataException { public Frame readFrame(Frame f) throws IOException,
if(!in.readFrame(f, window)) return false; InvalidDataException {
long frameNumber = HeaderEncoder.getFrameNumber(f.getBuffer()); freeFrames.add(f);
if(!window.remove(frameNumber)) throw new IllegalStateException(); // Read frames until there's an in-order frame to return
return true; Frame next = frames.peek();
while(next == null || next.getFrameNumber() > nextFrameNumber) {
// Grab a free frame, or allocate one if necessary
int free = freeFrames.size();
if(free == 0) f = new Frame(maxFrameLength);
else f = freeFrames.remove(free - 1);
// Read a frame
if(!in.readFrame(f, window)) return null;
// If the frame is in order, return it
long frameNumber = f.getFrameNumber();
if(frameNumber == nextFrameNumber) {
nextFrameNumber++;
return f;
}
// Insert the frame into the list
if(next == null || next.getFrameNumber() > frameNumber) {
frames.push(f);
} else {
boolean inserted = false;
ListIterator<Frame> it = frames.listIterator();
while(it.hasNext()) {
if(it.next().getFrameNumber() > frameNumber) {
// Insert the frame before the one just examined
it.previous();
it.add(f);
inserted = true;
break;
}
}
if(!inserted) frames.add(f);
}
next = frames.peek();
}
assert next != null && next.getFrameNumber() == nextFrameNumber;
frames.poll();
nextFrameNumber++;
return next;
} }
public int getMaxFrameLength() { public int getMaxFrameLength() {

View File

@@ -0,0 +1,27 @@
package net.sf.briar.transport;
import java.io.IOException;
class NullIncomingReliabilityLayer implements IncomingReliabilityLayer {
private final IncomingAuthenticationLayer in;
private final int maxFrameLength;
private final FrameWindow window;
NullIncomingReliabilityLayer(IncomingAuthenticationLayer in) {
this.in = in;
maxFrameLength = in.getMaxFrameLength();
window = new NullFrameWindow();
}
public Frame readFrame(Frame f) throws IOException, InvalidDataException {
if(!in.readFrame(f, window)) return null;
if(!window.remove(f.getFrameNumber()))
throw new IllegalStateException();
return f;
}
public int getMaxFrameLength() {
return maxFrameLength;
}
}

View File

@@ -224,8 +224,7 @@ public class ConnectionReaderImplTest extends TransportTest {
IncomingAuthenticationLayer authentication = IncomingAuthenticationLayer authentication =
new IncomingAuthenticationLayerImpl(correction, mac, macKey); new IncomingAuthenticationLayerImpl(correction, mac, macKey);
IncomingReliabilityLayer reliability = IncomingReliabilityLayer reliability =
new IncomingReliabilityLayerImpl(authentication, new NullIncomingReliabilityLayer(authentication);
new NullFrameWindow());
return new ConnectionReaderImpl(reliability, false); return new ConnectionReaderImpl(reliability, false);
} }
} }

View File

@@ -103,8 +103,7 @@ public class FrameReadWriteTest extends BriarTestCase {
IncomingAuthenticationLayer authenticationIn = IncomingAuthenticationLayer authenticationIn =
new IncomingAuthenticationLayerImpl(correctionIn, mac, macKey); new IncomingAuthenticationLayerImpl(correctionIn, mac, macKey);
IncomingReliabilityLayer reliabilityIn = IncomingReliabilityLayer reliabilityIn =
new IncomingReliabilityLayerImpl(authenticationIn, new NullIncomingReliabilityLayer(authenticationIn);
new NullFrameWindow());
ConnectionReader reader = new ConnectionReaderImpl(reliabilityIn, ConnectionReader reader = new ConnectionReaderImpl(reliabilityIn,
false); false);
InputStream in1 = reader.getInputStream(); InputStream in1 = reader.getInputStream();