mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-12 18:59:06 +01:00
Protocol refactoring. Each bundle now consists of a signed header and zero or more signed batches. There is no overall signature on the bundle, since the bundle's contents may need to be processed before the entire bundle has been read. The protocol does not prevent an adversary from removing batches from a bundle, reordering batches, moving them from one bundle to another, etc. However, since each batch is signed and acknowledged independently, no such guarantees are required. Bundle IDs will go away when the retransmission mechanism is changed.
This commit is contained in:
@@ -1,9 +1,10 @@
|
||||
package net.sf.briar.db;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.SignatureException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
@@ -19,6 +20,8 @@ import net.sf.briar.api.protocol.BatchId;
|
||||
import net.sf.briar.api.protocol.Bundle;
|
||||
import net.sf.briar.api.protocol.BundleBuilder;
|
||||
import net.sf.briar.api.protocol.GroupId;
|
||||
import net.sf.briar.api.protocol.Header;
|
||||
import net.sf.briar.api.protocol.HeaderBuilder;
|
||||
import net.sf.briar.api.protocol.Message;
|
||||
import net.sf.briar.api.protocol.MessageId;
|
||||
|
||||
@@ -48,8 +51,9 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
|
||||
@Inject
|
||||
SynchronizedDatabaseComponent(Database<Txn> db, DatabaseCleaner cleaner,
|
||||
Provider<HeaderBuilder> headerBuilderProvider,
|
||||
Provider<BatchBuilder> batchBuilderProvider) {
|
||||
super(db, cleaner, batchBuilderProvider);
|
||||
super(db, cleaner, headerBuilderProvider, batchBuilderProvider);
|
||||
}
|
||||
|
||||
public void close() throws DbException {
|
||||
@@ -140,21 +144,20 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
}
|
||||
|
||||
public Bundle generateBundle(ContactId c, BundleBuilder b)
|
||||
throws DbException {
|
||||
throws DbException, IOException, SignatureException {
|
||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c);
|
||||
// Ack all batches received from c
|
||||
HeaderBuilder h;
|
||||
// Add acks
|
||||
synchronized(contactLock) {
|
||||
if(!containsContact(c)) throw new NoSuchContactException();
|
||||
h = headerBuilderProvider.get();
|
||||
synchronized(messageStatusLock) {
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
int numAcks = 0;
|
||||
for(BatchId ack : db.removeBatchesToAck(txn, c)) {
|
||||
b.addAck(ack);
|
||||
numAcks++;
|
||||
}
|
||||
Set<BatchId> acks = db.removeBatchesToAck(txn, c);
|
||||
h.addAcks(acks);
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Added " + numAcks + " acks");
|
||||
LOG.fine("Added " + acks.size() + " acks");
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -162,19 +165,16 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Add a list of subscriptions
|
||||
// Add subscriptions
|
||||
synchronized(contactLock) {
|
||||
if(!containsContact(c)) throw new NoSuchContactException();
|
||||
synchronized(subscriptionLock) {
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
int numSubs = 0;
|
||||
for(GroupId g : db.getSubscriptions(txn)) {
|
||||
b.addSubscription(g);
|
||||
numSubs++;
|
||||
}
|
||||
Set<GroupId> subs = db.getSubscriptions(txn);
|
||||
h.addSubscriptions(subs);
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Added " + numSubs + " subscriptions");
|
||||
LOG.fine("Added " + subs.size() + " subscriptions");
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -188,14 +188,10 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
synchronized(transportLock) {
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
int numTransports = 0;
|
||||
Map<String, String> transports = db.getTransports(txn);
|
||||
for(Entry<String, String> e : transports.entrySet()) {
|
||||
b.addTransport(e.getKey(), e.getValue());
|
||||
numTransports++;
|
||||
}
|
||||
h.addTransports(transports);
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Added " + numTransports + " transports");
|
||||
LOG.fine("Added " + transports.size() + " transports");
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -203,8 +199,12 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Add as many messages as possible to the bundle
|
||||
// Sign the header and add it to the bundle
|
||||
Header header = h.build();
|
||||
long capacity = b.getCapacity();
|
||||
capacity -= header.getSize();
|
||||
b.addHeader(header);
|
||||
// Add as many messages as possible to the bundle
|
||||
while(true) {
|
||||
Batch batch = fillBatch(c, capacity);
|
||||
if(batch == null) break; // No more messages to send
|
||||
@@ -213,7 +213,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
capacity -= size;
|
||||
// If the batch is less than half full, stop trying - there may be
|
||||
// more messages trickling in but we can't wait forever
|
||||
if(size * 2 < Batch.CAPACITY) break;
|
||||
if(size * 2 < Batch.MAX_SIZE) break;
|
||||
}
|
||||
Bundle bundle = b.build();
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
@@ -222,14 +222,15 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
return bundle;
|
||||
}
|
||||
|
||||
private Batch fillBatch(ContactId c, long capacity) throws DbException {
|
||||
private Batch fillBatch(ContactId c, long capacity) throws DbException,
|
||||
SignatureException {
|
||||
synchronized(contactLock) {
|
||||
if(!containsContact(c)) throw new NoSuchContactException();
|
||||
synchronized(messageLock) {
|
||||
synchronized(messageStatusLock) {
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
capacity = Math.min(capacity, Batch.CAPACITY);
|
||||
capacity = Math.min(capacity, Batch.MAX_SIZE);
|
||||
Iterator<MessageId> it =
|
||||
db.getSendableMessages(txn, c, capacity).iterator();
|
||||
if(!it.hasNext()) {
|
||||
@@ -252,6 +253,9 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
throw e;
|
||||
} catch(SignatureException e) {
|
||||
db.abortTransaction(txn);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -331,18 +335,20 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
}
|
||||
}
|
||||
|
||||
public void receiveBundle(ContactId c, Bundle b) throws DbException {
|
||||
public void receiveBundle(ContactId c, Bundle b) throws DbException,
|
||||
IOException, SignatureException {
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Received bundle from " + c + ", "
|
||||
+ b.getSize() + " bytes");
|
||||
Header h;
|
||||
// Mark all messages in acked batches as seen
|
||||
synchronized(contactLock) {
|
||||
if(!containsContact(c)) throw new NoSuchContactException();
|
||||
h = b.getHeader();
|
||||
synchronized(messageLock) {
|
||||
synchronized(messageStatusLock) {
|
||||
int acks = 0;
|
||||
for(BatchId ack : b.getAcks()) {
|
||||
acks++;
|
||||
Set<BatchId> acks = h.getAcks();
|
||||
for(BatchId ack : acks) {
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.removeAckedBatch(txn, c, ack);
|
||||
@@ -353,7 +359,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
}
|
||||
}
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Received " + acks + " acks");
|
||||
LOG.fine("Received " + acks.size() + " acks");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -363,14 +369,12 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
synchronized(subscriptionLock) {
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
// FIXME: Replace clearSubs and addSub with setSubs
|
||||
db.clearSubscriptions(txn, c);
|
||||
int subs = 0;
|
||||
for(GroupId g : b.getSubscriptions()) {
|
||||
subs++;
|
||||
db.addSubscription(txn, c, g);
|
||||
}
|
||||
Set<GroupId> subs = h.getSubscriptions();
|
||||
for(GroupId sub : subs) db.addSubscription(txn, c, sub);
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Received " + subs + " subscriptions");
|
||||
LOG.fine("Received " + subs.size() + " subscriptions");
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -384,7 +388,11 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
synchronized(transportLock) {
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.setTransports(txn, c, b.getTransports());
|
||||
Map<String, String> transports = h.getTransports();
|
||||
db.setTransports(txn, c, transports);
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Received " + transports.size()
|
||||
+ " transports");
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -394,7 +402,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
}
|
||||
// Store the messages
|
||||
int batches = 0;
|
||||
for(Batch batch : b.getBatches()) {
|
||||
for(Batch batch = b.getNextBatch(); batch != null; batch = b.getNextBatch()) {
|
||||
batches++;
|
||||
waitForPermissionToWrite();
|
||||
synchronized(contactLock) {
|
||||
@@ -436,7 +444,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
synchronized(messageStatusLock) {
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
lost = db.addReceivedBundle(txn, c, b.getId());
|
||||
lost = db.addReceivedBundle(txn, c, h.getId());
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
|
||||
Reference in New Issue
Block a user