Builders for incoming and outgoing headers and batches. The protocol and serial components can now be used to serialise, sign, deserialise and verify real bundles (except for message parsing).

This commit is contained in:
akwizgran
2011-07-12 16:50:20 +01:00
parent e0509db45d
commit 3d549ea6ac
33 changed files with 703 additions and 294 deletions

View File

@@ -1,6 +1,7 @@
package net.sf.briar.db;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SignatureException;
import java.util.HashSet;
import java.util.Iterator;
@@ -17,8 +18,9 @@ import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchBuilder;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Bundle;
import net.sf.briar.api.protocol.BundleBuilder;
import net.sf.briar.api.protocol.BundleId;
import net.sf.briar.api.protocol.BundleReader;
import net.sf.briar.api.protocol.BundleWriter;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Header;
import net.sf.briar.api.protocol.HeaderBuilder;
@@ -143,8 +145,8 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public Bundle generateBundle(ContactId c, BundleBuilder b)
throws DbException, IOException, SignatureException {
public void generateBundle(ContactId c, BundleWriter b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c);
HeaderBuilder h;
// Add acks
@@ -215,15 +217,13 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
// more messages trickling in but we can't wait forever
if(size * 2 < Batch.MAX_SIZE) break;
}
Bundle bundle = b.build();
if(LOG.isLoggable(Level.FINE))
LOG.fine("Bundle generated, " + bundle.getSize() + " bytes");
b.close();
if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated");
System.gc();
return bundle;
}
private Batch fillBatch(ContactId c, long capacity) throws DbException,
SignatureException {
IOException, GeneralSecurityException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
@@ -335,8 +335,8 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public void receiveBundle(ContactId c, Bundle b) throws DbException,
IOException, SignatureException {
public void receiveBundle(ContactId c, BundleReader b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received bundle from " + c + ", "
+ b.getSize() + " bytes");
@@ -402,40 +402,52 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
// Store the messages
int batches = 0;
for(Batch batch = b.getNextBatch(); batch != null; batch = b.getNextBatch()) {
Batch batch = null;
while((batch = b.getNextBatch()) != null) {
storeBatch(c, batch);
batches++;
waitForPermissionToWrite();
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
int received = 0, stored = 0;
for(Message m : batch.getMessages()) {
received++;
GroupId g = m.getGroup();
if(db.containsSubscription(txn, g)) {
if(storeMessage(txn, m, c)) stored++;
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
b.close();
retransmitLostBatches(c, h.getId());
System.gc();
}
private void storeBatch(ContactId c, Batch b) throws DbException {
waitForPermissionToWrite();
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
int received = 0, stored = 0;
for(Message m : b.getMessages()) {
received++;
GroupId g = m.getGroup();
if(db.containsSubscription(txn, g)) {
if(storeMessage(txn, m, c)) stored++;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + received
+ " messages, stored " + stored);
db.addBatchToAck(txn, c, batch.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + received
+ " messages, stored " + stored);
db.addBatchToAck(txn, c, b.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
}
private void retransmitLostBatches(ContactId c, BundleId b)
throws DbException {
// Find any lost batches that need to be retransmitted
Set<BatchId> lost;
synchronized(contactLock) {
@@ -444,7 +456,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
lost = db.addReceivedBundle(txn, c, h.getId());
lost = db.addReceivedBundle(txn, c, b);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -472,7 +484,6 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
System.gc();
}
public void removeContact(ContactId c) throws DbException {