Don't do IO while holding database locks.

This commit is contained in:
akwizgran
2011-09-23 12:55:23 +01:00
parent 09971c8460
commit b675c38953
5 changed files with 85 additions and 67 deletions

View File

@@ -7,9 +7,12 @@ import net.sf.briar.api.protocol.BatchId;
/** An interface for creating a batch packet. */ /** An interface for creating a batch packet. */
public interface BatchWriter { public interface BatchWriter {
/** Returns the capacity of the batch. */
int getCapacity();
/** /**
* Sets the maximum length of the serialised batch. If this method is not * Sets the maximum length of the serialised batch; the default is
* called, the default is ProtocolConstants.MAX_PACKET_LENGTH; * ProtocolConstants.MAX_PACKET_LENGTH;
*/ */
void setMaxPacketLength(int length); void setMaxPacketLength(int length);

View File

@@ -287,7 +287,7 @@ interface Database<T> {
* Locking: contacts read, messages read, messageStatuses read, * Locking: contacts read, messages read, messageStatuses read,
* subscriptions read. * subscriptions read.
*/ */
Collection<MessageId> getSendableMessages(T txn, ContactId c, int size) Collection<MessageId> getSendableMessages(T txn, ContactId c, int capacity)
throws DbException; throws DbException;
/** /**

View File

@@ -12,16 +12,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import com.google.inject.Inject; import net.sf.briar.api.Bytes;
import net.sf.briar.api.ContactId; import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating; import net.sf.briar.api.Rating;
import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseListener; import net.sf.briar.api.db.DatabaseListener;
import net.sf.briar.api.db.DatabaseListener.Event;
import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.db.NoSuchContactException;
import net.sf.briar.api.db.Status; import net.sf.briar.api.db.Status;
import net.sf.briar.api.db.DatabaseListener.Event;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.Batch;
@@ -31,7 +30,6 @@ import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId; import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer; import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolConstants;
import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.writers.AckWriter; import net.sf.briar.api.protocol.writers.AckWriter;
@@ -42,6 +40,8 @@ import net.sf.briar.api.protocol.writers.SubscriptionWriter;
import net.sf.briar.api.protocol.writers.TransportWriter; import net.sf.briar.api.protocol.writers.TransportWriter;
import net.sf.briar.api.transport.ConnectionWindow; import net.sf.briar.api.transport.ConnectionWindow;
import com.google.inject.Inject;
/** /**
* An implementation of DatabaseComponent using reentrant read-write locks. * An implementation of DatabaseComponent using reentrant read-write locks.
* Depending on the JVM's lock implementation, this implementation may allow * Depending on the JVM's lock implementation, this implementation may allow
@@ -426,33 +426,30 @@ DatabaseCleaner.Callback {
public boolean generateBatch(ContactId c, BatchWriter b) throws DbException, public boolean generateBatch(ContactId c, BatchWriter b) throws DbException,
IOException { IOException {
Collection<MessageId> ids = new ArrayList<MessageId>();
Collection<Bytes> messages = new ArrayList<Bytes>();
// Get some sendable messages from the database
contactLock.readLock().lock(); contactLock.readLock().lock();
try { try {
if(!containsContact(c)) throw new NoSuchContactException(); if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock(); messageLock.readLock().lock();
try { try {
Collection<MessageId> sent = new ArrayList<MessageId>();
messageStatusLock.readLock().lock(); messageStatusLock.readLock().lock();
try { try {
subscriptionLock.readLock().lock(); subscriptionLock.readLock().lock();
try { try {
T txn = db.startTransaction(); T txn = db.startTransaction();
try { try {
int capacity = ProtocolConstants.MAX_PACKET_LENGTH; int capacity = b.getCapacity();
Collection<MessageId> sendable = ids = db.getSendableMessages(txn, c, capacity);
db.getSendableMessages(txn, c, capacity); for(MessageId m : ids) {
for(MessageId m : sendable) {
byte[] raw = db.getMessage(txn, m); byte[] raw = db.getMessage(txn, m);
if(!b.writeMessage(raw)) break; messages.add(new Bytes(raw));
sent.add(m);
} }
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
throw e; throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
} }
} finally { } finally {
subscriptionLock.readLock().unlock(); subscriptionLock.readLock().unlock();
@@ -460,16 +457,41 @@ DatabaseCleaner.Callback {
} finally { } finally {
messageStatusLock.readLock().unlock(); messageStatusLock.readLock().unlock();
} }
// Record the contents of the batch, unless it's empty } finally {
if(sent.isEmpty()) return false; messageLock.readLock().unlock();
BatchId id = b.finish(); }
} finally {
contactLock.readLock().unlock();
}
if(ids.isEmpty()) return false;
writeAndRecordBatch(c, b, ids, messages);
return true;
}
private void writeAndRecordBatch(ContactId c, BatchWriter b,
Collection<MessageId> ids, Collection<Bytes> messages)
throws DbException, IOException {
assert !ids.isEmpty();
assert !messages.isEmpty();
assert ids.size() == messages.size();
// Add the messages to the batch
for(Bytes raw : messages) {
boolean written = b.writeMessage(raw.getBytes());
assert written;
}
BatchId id = b.finish();
// Record the contents of the batch
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
messageStatusLock.writeLock().lock(); messageStatusLock.writeLock().lock();
try { try {
T txn = db.startTransaction(); T txn = db.startTransaction();
try { try {
db.addOutstandingBatch(txn, c, id, sent); db.addOutstandingBatch(txn, c, id, ids);
db.commitTransaction(txn); db.commitTransaction(txn);
return true;
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
throw e; throw e;
@@ -487,29 +509,29 @@ DatabaseCleaner.Callback {
public boolean generateBatch(ContactId c, BatchWriter b, public boolean generateBatch(ContactId c, BatchWriter b,
Collection<MessageId> requested) throws DbException, IOException { Collection<MessageId> requested) throws DbException, IOException {
Collection<MessageId> ids = new ArrayList<MessageId>();
Collection<Bytes> messages = new ArrayList<Bytes>();
// Get some sendable messages from the database
contactLock.readLock().lock(); contactLock.readLock().lock();
try { try {
if(!containsContact(c)) throw new NoSuchContactException(); if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock(); messageLock.readLock().lock();
try { try {
Collection<MessageId> sent = new ArrayList<MessageId>();
messageStatusLock.readLock().lock(); messageStatusLock.readLock().lock();
try{ try{
subscriptionLock.readLock().lock(); subscriptionLock.readLock().lock();
try { try {
T txn = db.startTransaction(); T txn = db.startTransaction();
try { try {
int capacity = b.getCapacity();
Iterator<MessageId> it = requested.iterator(); Iterator<MessageId> it = requested.iterator();
while(it.hasNext()) { while(it.hasNext()) {
MessageId m = it.next(); MessageId m = it.next();
// If the message is still sendable, try to add
// it to the batch
byte[] raw = db.getMessageIfSendable(txn, c, m); byte[] raw = db.getMessageIfSendable(txn, c, m);
if(raw != null) { if(raw != null) {
// If the batch is full, don't treat the if(raw.length > capacity) break;
// message as considered ids.add(m);
if(!b.writeMessage(raw)) break; messages.add(new Bytes(raw));
sent.add(m);
} }
it.remove(); it.remove();
} }
@@ -517,9 +539,6 @@ DatabaseCleaner.Callback {
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
throw e; throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
} }
} finally { } finally {
subscriptionLock.readLock().unlock(); subscriptionLock.readLock().unlock();
@@ -527,29 +546,15 @@ DatabaseCleaner.Callback {
} finally { } finally {
messageStatusLock.readLock().unlock(); messageStatusLock.readLock().unlock();
} }
// Record the contents of the batch, unless it's empty
if(sent.isEmpty()) return false;
BatchId id = b.finish();
messageStatusLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
return true;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally { } finally {
messageLock.readLock().unlock(); messageLock.readLock().unlock();
} }
} finally { } finally {
contactLock.readLock().unlock(); contactLock.readLock().unlock();
} }
if(ids.isEmpty()) return false;
writeAndRecordBatch(c, b, ids, messages);
return true;
} }
public Collection<MessageId> generateOffer(ContactId c, OfferWriter o) public Collection<MessageId> generateOffer(ContactId c, OfferWriter o)

View File

@@ -22,6 +22,7 @@ class BatchWriterImpl implements BatchWriter {
private boolean started = false; private boolean started = false;
private int capacity = ProtocolConstants.MAX_PACKET_LENGTH; private int capacity = ProtocolConstants.MAX_PACKET_LENGTH;
private int remaining = capacity;
BatchWriterImpl(OutputStream out, SerialComponent serial, BatchWriterImpl(OutputStream out, SerialComponent serial,
WriterFactory writerFactory, MessageDigest messageDigest) { WriterFactory writerFactory, MessageDigest messageDigest) {
@@ -33,20 +34,24 @@ class BatchWriterImpl implements BatchWriter {
this.messageDigest = messageDigest; this.messageDigest = messageDigest;
} }
public int getCapacity() {
return capacity - headerLength - footerLength;
}
public void setMaxPacketLength(int length) { public void setMaxPacketLength(int length) {
if(started) throw new IllegalStateException(); if(started) throw new IllegalStateException();
if(length < 0 || length > ProtocolConstants.MAX_PACKET_LENGTH) if(length < 0 || length > ProtocolConstants.MAX_PACKET_LENGTH)
throw new IllegalArgumentException(); throw new IllegalArgumentException();
capacity = length; remaining = capacity = length;
} }
public boolean writeMessage(byte[] message) throws IOException { public boolean writeMessage(byte[] message) throws IOException {
int overhead = started ? footerLength : headerLength + footerLength; int overhead = started ? footerLength : headerLength + footerLength;
if(capacity < message.length + overhead) return false; if(remaining < message.length + overhead) return false;
if(!started) start(); if(!started) start();
// Bypass the writer and write the raw message directly // Bypass the writer and write the raw message directly
out.write(message); out.write(message);
capacity -= message.length; remaining -= message.length;
return true; return true;
} }
@@ -54,7 +59,7 @@ class BatchWriterImpl implements BatchWriter {
if(!started) start(); if(!started) start();
w.writeListEnd(); w.writeListEnd();
out.flush(); out.flush();
capacity = ProtocolConstants.MAX_PACKET_LENGTH; remaining = capacity = ProtocolConstants.MAX_PACKET_LENGTH;
started = false; started = false;
return new BatchId(messageDigest.digest()); return new BatchId(messageDigest.digest());
} }
@@ -63,7 +68,7 @@ class BatchWriterImpl implements BatchWriter {
messageDigest.reset(); messageDigest.reset();
w.writeUserDefinedId(Types.BATCH); w.writeUserDefinedId(Types.BATCH);
w.writeListStart(); w.writeListStart();
capacity -= headerLength; remaining -= headerLength;
started = true; started = true;
} }
} }

View File

@@ -738,24 +738,27 @@ public abstract class DatabaseComponentTest extends TestCase {
allowing(database).commitTransaction(txn); allowing(database).commitTransaction(txn);
allowing(database).containsContact(txn, contactId); allowing(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
// Find out how much space we've got
oneOf(batchWriter).getCapacity();
will(returnValue(ProtocolConstants.MAX_PACKET_LENGTH));
// Get the sendable messages // Get the sendable messages
oneOf(database).getSendableMessages(txn, contactId, oneOf(database).getSendableMessages(txn, contactId,
ProtocolConstants.MAX_PACKET_LENGTH); ProtocolConstants.MAX_PACKET_LENGTH);
will(returnValue(sendable)); will(returnValue(sendable));
// Try to add both messages to the writer - only manage to add one
oneOf(database).getMessage(txn, messageId); oneOf(database).getMessage(txn, messageId);
will(returnValue(raw)); will(returnValue(raw));
oneOf(batchWriter).writeMessage(raw);
will(returnValue(true));
oneOf(database).getMessage(txn, messageId1); oneOf(database).getMessage(txn, messageId1);
will(returnValue(raw1)); will(returnValue(raw1));
// Add the sendable messages to the batch
oneOf(batchWriter).writeMessage(raw);
will(returnValue(true));
oneOf(batchWriter).writeMessage(raw1); oneOf(batchWriter).writeMessage(raw1);
will(returnValue(false)); will(returnValue(true));
oneOf(batchWriter).finish(); oneOf(batchWriter).finish();
will(returnValue(batchId)); will(returnValue(batchId));
// Record the message that was sent // Record the message that was sent
oneOf(database).addOutstandingBatch(txn, contactId, batchId, oneOf(database).addOutstandingBatch(txn, contactId, batchId,
Collections.singletonList(messageId)); sendable);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner); DatabaseComponent db = createDatabaseComponent(database, cleaner);
@@ -784,22 +787,24 @@ public abstract class DatabaseComponentTest extends TestCase {
allowing(database).commitTransaction(txn); allowing(database).commitTransaction(txn);
allowing(database).containsContact(txn, contactId); allowing(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
// Try to get the requested messages and add them to the writer // Find out how much space we've got
oneOf(batchWriter).getCapacity();
will(returnValue(ProtocolConstants.MAX_PACKET_LENGTH));
// Try to get the requested messages
oneOf(database).getMessageIfSendable(txn, contactId, messageId); oneOf(database).getMessageIfSendable(txn, contactId, messageId);
will(returnValue(raw)); // Message is sendable
oneOf(batchWriter).writeMessage(raw);
will(returnValue(true)); // Message added to batch
oneOf(database).getMessageIfSendable(txn, contactId, messageId1);
will(returnValue(null)); // Message is not sendable will(returnValue(null)); // Message is not sendable
oneOf(database).getMessageIfSendable(txn, contactId, messageId2); oneOf(database).getMessageIfSendable(txn, contactId, messageId1);
will(returnValue(raw1)); // Message is sendable will(returnValue(raw1)); // Message is sendable
oneOf(database).getMessageIfSendable(txn, contactId, messageId2);
will(returnValue(null)); // Message is not sendable
// Add the sendable message to the batch
oneOf(batchWriter).writeMessage(raw1); oneOf(batchWriter).writeMessage(raw1);
will(returnValue(false)); // Message not added to batch will(returnValue(true));
oneOf(batchWriter).finish(); oneOf(batchWriter).finish();
will(returnValue(batchId)); will(returnValue(batchId));
// Record the message that was sent // Record the message that was sent
oneOf(database).addOutstandingBatch(txn, contactId, batchId, oneOf(database).addOutstandingBatch(txn, contactId, batchId,
Collections.singletonList(messageId)); Collections.singletonList(messageId1));
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner); DatabaseComponent db = createDatabaseComponent(database, cleaner);