New retransmission mechanism, which does away with the need for bundle IDs and should cope better with high bandwidth-delay product links.

This commit is contained in:
akwizgran
2011-07-14 09:39:15 +01:00
parent d889a08cf4
commit a121dcdda8
17 changed files with 206 additions and 299 deletions

View File

@@ -15,7 +15,6 @@ import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.BundleId;
import net.sf.briar.api.protocol.BundleReader;
import net.sf.briar.api.protocol.BundleWriter;
import net.sf.briar.api.protocol.GroupId;
@@ -33,7 +32,6 @@ public abstract class DatabaseComponentTest extends TestCase {
protected final Object txn = new Object();
protected final AuthorId authorId;
protected final BatchId batchId;
protected final BundleId bundleId;
protected final ContactId contactId;
protected final GroupId groupId;
protected final MessageId messageId, parentId;
@@ -51,7 +49,6 @@ public abstract class DatabaseComponentTest extends TestCase {
super();
authorId = new AuthorId(TestUtils.getRandomId());
batchId = new BatchId(TestUtils.getRandomId());
bundleId = new BundleId(TestUtils.getRandomId());
contactId = new ContactId(123);
groupId = new GroupId(TestUtils.getRandomId());
messageId = new MessageId(TestUtils.getRandomId());
@@ -478,7 +475,6 @@ public abstract class DatabaseComponentTest extends TestCase {
will(returnValue(transports));
// Build the header
oneOf(bundleWriter).addHeader(acks, subs, transports);
will(returnValue(bundleId));
// Add a batch to the bundle
oneOf(bundleWriter).getRemainingCapacity();
will(returnValue(1024L * 1024L - headerSize));
@@ -579,9 +575,7 @@ public abstract class DatabaseComponentTest extends TestCase {
will(returnValue(null));
oneOf(bundleReader).finish();
// Lost batches
oneOf(header).getId();
will(returnValue(bundleId));
oneOf(database).addReceivedBundle(txn, contactId, bundleId);
oneOf(database).getLostBatches(txn, contactId);
will(returnValue(Collections.singleton(batchId)));
oneOf(database).removeLostBatch(txn, contactId, batchId);
}});

View File

@@ -21,7 +21,6 @@ import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.BundleId;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageFactory;
@@ -489,13 +488,10 @@ public class H2DatabaseTest extends TestCase {
@Test
public void testRetransmission() throws DbException {
BundleId bundleId = new BundleId(TestUtils.getRandomId());
BundleId bundleId1 = new BundleId(TestUtils.getRandomId());
BundleId bundleId2 = new BundleId(TestUtils.getRandomId());
BundleId bundleId3 = new BundleId(TestUtils.getRandomId());
BundleId bundleId4 = new BundleId(TestUtils.getRandomId());
BatchId batchId1 = new BatchId(TestUtils.getRandomId());
BatchId batchId2 = new BatchId(TestUtils.getRandomId());
BatchId[] ids = new BatchId[Database.RETRANSMIT_THRESHOLD + 5];
for(int i = 0; i < ids.length; i++) {
ids[i] = new BatchId(TestUtils.getRandomId());
}
Set<MessageId> empty = Collections.emptySet();
Mockery context = new Mockery();
MessageFactory messageFactory = context.mock(MessageFactory.class);
@@ -504,30 +500,62 @@ public class H2DatabaseTest extends TestCase {
// Add a contact
Connection txn = db.startTransaction();
assertEquals(contactId, db.addContact(txn, null));
// Add an oustanding batch (associated with BundleId.NONE)
db.addOutstandingBatch(txn, contactId, batchId, empty);
// Receive a bundle
Set<BatchId> lost = db.addReceivedBundle(txn, contactId, bundleId);
assertTrue(lost.isEmpty());
// Add a couple more outstanding batches (associated with bundleId)
db.addOutstandingBatch(txn, contactId, batchId1, empty);
db.addOutstandingBatch(txn, contactId, batchId2, empty);
// Receive another bundle
lost = db.addReceivedBundle(txn, contactId, bundleId1);
assertTrue(lost.isEmpty());
// The contact acks one of the batches - it should not be retransmitted
db.removeAckedBatch(txn, contactId, batchId1);
// Receive another bundle - batchId should now be considered lost
lost = db.addReceivedBundle(txn, contactId, bundleId2);
assertEquals(1, lost.size());
assertTrue(lost.contains(batchId));
// Receive another bundle - batchId2 should now be considered lost
lost = db.addReceivedBundle(txn, contactId, bundleId3);
assertEquals(1, lost.size());
assertTrue(lost.contains(batchId2));
// Receive another bundle - no further losses
lost = db.addReceivedBundle(txn, contactId, bundleId4);
assertTrue(lost.isEmpty());
// Add some outstanding batches, a few ms apart
for(int i = 0; i < ids.length; i++) {
db.addOutstandingBatch(txn, contactId, ids[i], empty);
try {
Thread.sleep(5);
} catch(InterruptedException ignored) {}
}
// The contact acks the batches in reverse order. The first
// RETRANSMIT_THRESHOLD - 1 acks should not trigger any retransmissions
for(int i = 0; i < Database.RETRANSMIT_THRESHOLD - 1; i++) {
db.removeAckedBatch(txn, contactId, ids[ids.length - i - 1]);
Set<BatchId> lost = db.getLostBatches(txn, contactId);
assertEquals(Collections.emptySet(), lost);
}
// The next ack should trigger the retransmission of the remaining
// five outstanding batches
int index = ids.length - Database.RETRANSMIT_THRESHOLD;
db.removeAckedBatch(txn, contactId, ids[index]);
Set<BatchId> lost = db.getLostBatches(txn, contactId);
for(int i = 0; i < index; i++) {
assertTrue(lost.contains(ids[i]));
}
db.commitTransaction(txn);
db.close();
context.assertIsSatisfied();
}
@Test
public void testNoRetransmission() throws DbException {
BatchId[] ids = new BatchId[Database.RETRANSMIT_THRESHOLD * 2];
for(int i = 0; i < ids.length; i++) {
ids[i] = new BatchId(TestUtils.getRandomId());
}
Set<MessageId> empty = Collections.emptySet();
Mockery context = new Mockery();
MessageFactory messageFactory = context.mock(MessageFactory.class);
Database<Connection> db = open(false, messageFactory);
// Add a contact
Connection txn = db.startTransaction();
assertEquals(contactId, db.addContact(txn, null));
// Add some outstanding batches, a few ms apart
for(int i = 0; i < ids.length; i++) {
db.addOutstandingBatch(txn, contactId, ids[i], empty);
try {
Thread.sleep(5);
} catch(InterruptedException ignored) {}
}
// The contact acks the batches in the order they were sent - nothing
// should be retransmitted
for(int i = 0; i < ids.length; i++) {
db.removeAckedBatch(txn, contactId, ids[i]);
Set<BatchId> lost = db.getLostBatches(txn, contactId);
assertEquals(Collections.emptySet(), lost);
}
db.commitTransaction(txn);
db.close();

View File

@@ -2,7 +2,6 @@ package net.sf.briar.protocol;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.security.DigestOutputStream;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.MessageDigest;
@@ -39,7 +38,8 @@ public class SigningStreamTest extends TestCase {
random.nextBytes(input);
ByteArrayOutputStream out = new ByteArrayOutputStream();
SigningOutputStream signOut = new SigningOutputStream(out, sig);
SigningDigestingOutputStream signOut =
new SigningDigestingOutputStream(out, sig, dig);
sig.initSign(keyPair.getPrivate());
signOut.setSigning(true);
@@ -80,7 +80,8 @@ public class SigningStreamTest extends TestCase {
random.nextBytes(input);
ByteArrayOutputStream out = new ByteArrayOutputStream();
SigningOutputStream signOut = new SigningOutputStream(out, sig);
SigningDigestingOutputStream signOut =
new SigningDigestingOutputStream(out, sig, dig);
sig.initSign(keyPair.getPrivate());
// Sign bytes 0-499, skip bytes 500-749, sign bytes 750-999
@@ -121,16 +122,17 @@ public class SigningStreamTest extends TestCase {
random.nextBytes(input);
ByteArrayOutputStream out = new ByteArrayOutputStream();
DigestOutputStream digOut = new DigestOutputStream(out, dig);
SigningDigestingOutputStream signOut =
new SigningDigestingOutputStream(out, sig, dig);
dig.reset();
// Digest bytes 0-499, skip bytes 500-749, digest bytes 750-999
digOut.on(true);
digOut.write(input, 0, 500);
digOut.on(false);
digOut.write(input, 500, 250);
digOut.on(true);
digOut.write(input, 750, 250);
signOut.setDigesting(true);
signOut.write(input, 0, 500);
signOut.setDigesting(false);
signOut.write(input, 500, 250);
signOut.setDigesting(true);
signOut.write(input, 750, 250);
byte[] hash = dig.digest();