Rewrote the bundle reading and writing code to eliminate copying. Signatures and digests are now calculated on the fly as the data is read or written. This is a little bit tricky in the case of reading because ReaderImpl uses a lookahead byte, so the signature and message digest need to lag one byte behind.

This commit is contained in:
akwizgran
2011-07-13 16:39:31 +01:00
parent 70c698ca9d
commit e13b0437c3
38 changed files with 655 additions and 1333 deletions

View File

@@ -9,13 +9,9 @@ import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.BatchBuilder;
import net.sf.briar.api.protocol.HeaderBuilder;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import com.google.inject.Provider;
/**
* Abstract superclass containing code shared by ReadWriteLockDatabaseComponent
* and SynchronizedDatabaseComponent.
@@ -28,8 +24,6 @@ DatabaseCleaner.Callback {
protected final Database<Txn> db;
protected final DatabaseCleaner cleaner;
protected final Provider<HeaderBuilder> headerBuilderProvider;
protected final Provider<BatchBuilder> batchBuilderProvider;
private final Object spaceLock = new Object();
private final Object writeLock = new Object();
@@ -37,13 +31,9 @@ DatabaseCleaner.Callback {
private long timeOfLastCheck = 0L; // Locking: spaceLock
private volatile boolean writesAllowed = true;
DatabaseComponentImpl(Database<Txn> db, DatabaseCleaner cleaner,
Provider<HeaderBuilder> headerBuilderProvider,
Provider<BatchBuilder> batchBuilderProvider) {
DatabaseComponentImpl(Database<Txn> db, DatabaseCleaner cleaner) {
this.db = db;
this.cleaner = cleaner;
this.headerBuilderProvider = headerBuilderProvider;
this.batchBuilderProvider = batchBuilderProvider;
}
public void open(boolean resume) throws DbException {

View File

@@ -52,7 +52,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " authorId XXXX NOT NULL,"
+ " timestamp BIGINT NOT NULL,"
+ " size INT NOT NULL,"
+ " body BLOB NOT NULL,"
+ " raw BLOB NOT NULL,"
+ " sendability INT NOT NULL,"
+ " PRIMARY KEY (messageId),"
+ " FOREIGN KEY (groupId) REFERENCES localSubscriptions (groupId)"
@@ -458,7 +458,7 @@ abstract class JdbcDatabase implements Database<Connection> {
try {
String sql = "INSERT INTO messages"
+ " (messageId, parentId, groupId, authorId, timestamp, size,"
+ " body, sendability)"
+ " raw, sendability)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getId().getBytes());
@@ -834,7 +834,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
String sql =
"SELECT parentId, groupId, authorId, timestamp, size, body"
"SELECT parentId, groupId, authorId, timestamp, size, raw"
+ " FROM messages WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
@@ -847,14 +847,14 @@ abstract class JdbcDatabase implements Database<Connection> {
long timestamp = rs.getLong(4);
int size = rs.getInt(5);
Blob b = rs.getBlob(6);
byte[] body = b.getBytes(1, size);
assert body.length == size;
byte[] raw = b.getBytes(1, size);
assert raw.length == size;
boolean more = rs.next();
assert !more;
rs.close();
ps.close();
return messageFactory.createMessage(m, parent, group, author,
timestamp, body);
timestamp, raw);
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);

View File

@@ -2,9 +2,10 @@ package net.sf.briar.db;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SignatureException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -17,19 +18,16 @@ import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NoSuchContactException;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchBuilder;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.BundleId;
import net.sf.briar.api.protocol.BundleReader;
import net.sf.briar.api.protocol.BundleWriter;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Header;
import net.sf.briar.api.protocol.HeaderBuilder;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import com.google.inject.Inject;
import com.google.inject.Provider;
/**
* An implementation of DatabaseComponent using reentrant read-write locks.
@@ -59,10 +57,8 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
new ReentrantReadWriteLock(true);
@Inject
ReadWriteLockDatabaseComponent(Database<Txn> db, DatabaseCleaner cleaner,
Provider<HeaderBuilder> headerBuilderProvider,
Provider<BatchBuilder> batchBuilderProvider) {
super(db, cleaner, headerBuilderProvider, batchBuilderProvider);
ReadWriteLockDatabaseComponent(Database<Txn> db, DatabaseCleaner cleaner) {
super(db, cleaner);
}
public void close() throws DbException {
@@ -195,18 +191,18 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void generateBundle(ContactId c, BundleWriter b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c);
HeaderBuilder h;
Set<BatchId> acks;
Set<GroupId> subs;
Map<String, String> transports;
// Add acks
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
h = headerBuilderProvider.get();
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
Set<BatchId> acks = db.removeBatchesToAck(txn, c);
h.addAcks(acks);
acks = db.removeBatchesToAck(txn, c);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + acks.size() + " acks");
db.commitTransaction(txn);
@@ -228,8 +224,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
try {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = db.getSubscriptions(txn);
h.addSubscriptions(subs);
subs = db.getSubscriptions(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
@@ -251,8 +246,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
try {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn);
h.addTransports(transports);
transports = db.getTransports(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);
@@ -266,28 +260,16 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} finally {
contactLock.readLock().unlock();
}
// Sign the header and add it to the bundle
Header header = h.build();
long capacity = b.getCapacity();
capacity -= header.getSize();
b.addHeader(header);
// Add the header to the bundle
b.addHeader(acks, subs, transports);
// Add as many messages as possible to the bundle
while(true) {
Batch batch = fillBatch(c, capacity);
if(batch == null) break; // No more messages to send
b.addBatch(batch);
long size = batch.getSize();
capacity -= size;
// If the batch is less than half full, stop trying - there may be
// more messages trickling in but we can't wait forever
if(size * 2 < Batch.MAX_SIZE) break;
}
b.close();
while(fillBatch(c, b));
b.finish();
if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated");
System.gc();
}
private Batch fillBatch(ContactId c, long capacity) throws DbException,
private boolean fillBatch(ContactId c, BundleWriter b) throws DbException,
IOException, GeneralSecurityException {
contactLock.readLock().lock();
try {
@@ -295,31 +277,38 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
messageLock.readLock().lock();
try {
Set<MessageId> sent;
Batch batch;
int bytesSent = 0;
BatchId batchId;
messageStatusLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
capacity = Math.min(capacity, Batch.MAX_SIZE);
long capacity = Math.min(b.getRemainingCapacity(),
Batch.MAX_SIZE);
Iterator<MessageId> it =
db.getSendableMessages(txn, c, capacity).iterator();
if(!it.hasNext()) {
db.commitTransaction(txn);
return null; // No more messages to send
return false; // No more messages to send
}
sent = new HashSet<MessageId>();
BatchBuilder b = batchBuilderProvider.get();
List<Message> messages = new ArrayList<Message>();
while(it.hasNext()) {
MessageId m = it.next();
b.addMessage(db.getMessage(txn, m));
Message message = db.getMessage(txn, m);
bytesSent += message.getSize();
messages.add(message);
sent.add(m);
}
batch = b.build();
batchId = b.addBatch(messages);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(SignatureException e) {
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
} catch(GeneralSecurityException e) {
db.abortTransaction(txn);
throw e;
}
@@ -332,9 +321,10 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
Txn txn = db.startTransaction();
try {
assert !sent.isEmpty();
db.addOutstandingBatch(txn, c, batch.getId(), sent);
db.addOutstandingBatch(txn, c, batchId, sent);
db.commitTransaction(txn);
return batch;
// Don't create another batch if this one was half-empty
return bytesSent > Batch.MAX_SIZE / 2;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
@@ -443,9 +433,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void receiveBundle(ContactId c, BundleReader b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received bundle from " + c + ", "
+ b.getSize() + " bytes");
if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + c);
Header h;
// Mark all messages in acked batches as seen
contactLock.readLock().lock();
@@ -536,7 +524,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
b.close();
b.finish();
retransmitLostBatches(c, h.getId());
System.gc();
}

View File

@@ -3,8 +3,10 @@ package net.sf.briar.db;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SignatureException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
@@ -16,19 +18,16 @@ import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NoSuchContactException;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchBuilder;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.BundleId;
import net.sf.briar.api.protocol.BundleReader;
import net.sf.briar.api.protocol.BundleWriter;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Header;
import net.sf.briar.api.protocol.HeaderBuilder;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import com.google.inject.Inject;
import com.google.inject.Provider;
/**
* An implementation of DatabaseComponent using Java synchronization. This
@@ -52,10 +51,8 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
private final Object transportLock = new Object();
@Inject
SynchronizedDatabaseComponent(Database<Txn> db, DatabaseCleaner cleaner,
Provider<HeaderBuilder> headerBuilderProvider,
Provider<BatchBuilder> batchBuilderProvider) {
super(db, cleaner, headerBuilderProvider, batchBuilderProvider);
SynchronizedDatabaseComponent(Database<Txn> db, DatabaseCleaner cleaner) {
super(db, cleaner);
}
public void close() throws DbException {
@@ -148,16 +145,16 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void generateBundle(ContactId c, BundleWriter b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c);
HeaderBuilder h;
Set<BatchId> acks;
Set<GroupId> subs;
Map<String, String> transports;
// Add acks
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
h = headerBuilderProvider.get();
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
Set<BatchId> acks = db.removeBatchesToAck(txn, c);
h.addAcks(acks);
acks = db.removeBatchesToAck(txn, c);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + acks.size() + " acks");
db.commitTransaction(txn);
@@ -173,8 +170,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = db.getSubscriptions(txn);
h.addSubscriptions(subs);
subs = db.getSubscriptions(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
@@ -190,8 +186,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn);
h.addTransports(transports);
transports = db.getTransports(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);
@@ -201,28 +196,16 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
// Sign the header and add it to the bundle
Header header = h.build();
long capacity = b.getCapacity();
capacity -= header.getSize();
b.addHeader(header);
// Add the header to the bundle
b.addHeader(acks, subs, transports);
// Add as many messages as possible to the bundle
while(true) {
Batch batch = fillBatch(c, capacity);
if(batch == null) break; // No more messages to send
b.addBatch(batch);
long size = batch.getSize();
capacity -= size;
// If the batch is less than half full, stop trying - there may be
// more messages trickling in but we can't wait forever
if(size * 2 < Batch.MAX_SIZE) break;
}
b.close();
while(fillBatch(c, b));
b.finish();
if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated");
System.gc();
}
private Batch fillBatch(ContactId c, long capacity) throws DbException,
private boolean fillBatch(ContactId c, BundleWriter b) throws DbException,
IOException, GeneralSecurityException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -230,26 +213,31 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
capacity = Math.min(capacity, Batch.MAX_SIZE);
long capacity = Math.min(b.getRemainingCapacity(),
Batch.MAX_SIZE);
Iterator<MessageId> it =
db.getSendableMessages(txn, c, capacity).iterator();
if(!it.hasNext()) {
db.commitTransaction(txn);
return null; // No more messages to send
return false; // No more messages to send
}
BatchBuilder b = batchBuilderProvider.get();
Set<MessageId> sent = new HashSet<MessageId>();
List<Message> messages = new ArrayList<Message>();
int bytesSent = 0;
while(it.hasNext()) {
MessageId m = it.next();
b.addMessage(db.getMessage(txn, m));
Message message = db.getMessage(txn, m);
bytesSent += message.getSize();
messages.add(message);
sent.add(m);
}
Batch batch = b.build();
BatchId batchId = b.addBatch(messages);
// Record the contents of the batch
assert !sent.isEmpty();
db.addOutstandingBatch(txn, c, batch.getId(), sent);
db.addOutstandingBatch(txn, c, batchId, sent);
db.commitTransaction(txn);
return batch;
// Don't create another batch if this one was half-empty
return bytesSent > Batch.MAX_SIZE / 2;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
@@ -337,9 +325,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void receiveBundle(ContactId c, BundleReader b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received bundle from " + c + ", "
+ b.getSize() + " bytes");
if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + c);
Header h;
// Mark all messages in acked batches as seen
synchronized(contactLock) {
@@ -409,7 +395,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
b.close();
b.finish();
retransmitLostBatches(c, h.getId());
System.gc();
}