DatabaseComponent.generateBatch() now returns a boolean.

This allows a connection to know whether to try writing another batch
immediately, or to wait for an event from the DB.
This commit is contained in:
akwizgran
2011-09-20 14:45:07 +01:00
parent 75446b7f7e
commit 5548eb32cd
3 changed files with 24 additions and 24 deletions

View File

@@ -73,17 +73,21 @@ public interface DatabaseComponent {
void generateAck(ContactId c, AckWriter a) throws DbException,
IOException;
/** Generates a batch of messages for the given contact. */
void generateBatch(ContactId c, BatchWriter b) throws DbException,
/**
* Generates a batch of messages for the given contact.
* @return True if any messages were added to tbe batch.
*/
boolean generateBatch(ContactId c, BatchWriter b) throws DbException,
IOException;
/**
* Generates a batch of messages for the given contact from the given
* collection of requested messages, and returns the IDs of any messages
* that were either added to the batch, or were considered for inclusion
* but are no longer sendable to the contact.
* collection of requested messages. Any messages that were either added to
* the batch, or were considered but are no longer sendable to the contact,
* are removed from the collection of requested messages before returning.
* @return True if any messages were added to the batch.
*/
Collection<MessageId> generateBatch(ContactId c, BatchWriter b,
boolean generateBatch(ContactId c, BatchWriter b,
Collection<MessageId> requested) throws DbException, IOException;
/**

View File

@@ -408,29 +408,26 @@ DatabaseCleaner.Callback {
}
}
public void generateBatch(ContactId c, BatchWriter b) throws DbException,
public boolean generateBatch(ContactId c, BatchWriter b) throws DbException,
IOException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
Collection<MessageId> sent;
int bytesSent = 0;
Collection<MessageId> sent = new ArrayList<MessageId>();
messageStatusLock.readLock().lock();
try {
subscriptionLock.readLock().lock();
try {
T txn = db.startTransaction();
try {
sent = new ArrayList<MessageId>();
int capacity = b.getCapacity();
Collection<MessageId> sendable =
db.getSendableMessages(txn, c, capacity);
for(MessageId m : sendable) {
byte[] raw = db.getMessage(txn, m);
if(!b.writeMessage(raw)) break;
bytesSent += raw.length;
sent.add(m);
}
db.commitTransaction(txn);
@@ -448,7 +445,7 @@ DatabaseCleaner.Callback {
messageStatusLock.readLock().unlock();
}
// Record the contents of the batch, unless it's empty
if(sent.isEmpty()) return;
if(sent.isEmpty()) return false;
BatchId id = b.finish();
messageStatusLock.writeLock().lock();
try {
@@ -456,6 +453,7 @@ DatabaseCleaner.Callback {
try {
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
return true;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
@@ -471,24 +469,23 @@ DatabaseCleaner.Callback {
}
}
public Collection<MessageId> generateBatch(ContactId c, BatchWriter b,
public boolean generateBatch(ContactId c, BatchWriter b,
Collection<MessageId> requested) throws DbException, IOException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
Collection<MessageId> sent, considered;
Collection<MessageId> sent = new ArrayList<MessageId>();
messageStatusLock.readLock().lock();
try{
subscriptionLock.readLock().lock();
try {
T txn = db.startTransaction();
try {
sent = new ArrayList<MessageId>();
considered = new ArrayList<MessageId>();
int bytesSent = 0;
for(MessageId m : requested) {
Iterator<MessageId> it = requested.iterator();
while(it.hasNext()) {
MessageId m = it.next();
byte[] raw = db.getMessageIfSendable(txn, c, m);
// If the message is still sendable, try to add
// it to the batch. If the batch is full, don't
@@ -496,10 +493,9 @@ DatabaseCleaner.Callback {
// try to add any further messages.
if(raw != null) {
if(!b.writeMessage(raw)) break;
bytesSent += raw.length;
sent.add(m);
}
considered.add(m);
it.remove();
}
db.commitTransaction(txn);
} catch(DbException e) {
@@ -516,7 +512,7 @@ DatabaseCleaner.Callback {
messageStatusLock.readLock().unlock();
}
// Record the contents of the batch, unless it's empty
if(sent.isEmpty()) return considered;
if(sent.isEmpty()) return false;
BatchId id = b.finish();
messageStatusLock.writeLock().lock();
try {
@@ -524,7 +520,7 @@ DatabaseCleaner.Callback {
try {
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
return considered;
return true;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;

View File

@@ -41,8 +41,8 @@ class BatchWriterImpl implements BatchWriter {
started = true;
}
// Allow one byte for the list end tag
int capacity = ProtocolConstants.MAX_PACKET_LENGTH
- (int) w.getBytesWritten() - 1;
int capacity =
ProtocolConstants.MAX_PACKET_LENGTH - (int) w.getBytesWritten() - 1;
if(capacity < message.length) return false;
// Bypass the writer and write each raw message directly
out.write(message);