Added support for registering listeners with the database that are

called when new messages are available, and a new method
hasSendableMessages(ContactId) that listeners can call to see whether
it's worth trying to create a batch.
This commit is contained in:
akwizgran
2011-07-27 20:27:43 +01:00
parent e93fbe0b20
commit adee3e121c
10 changed files with 364 additions and 99 deletions

View File

@@ -109,28 +109,28 @@ interface Database<T> {
void addSubscription(T txn, Group g) throws DbException;
/**
* Returns true iff the database contains the given contact.
* Returns true if the database contains the given contact.
* <p>
* Locking: contacts read.
*/
boolean containsContact(T txn, ContactId c) throws DbException;
/**
* Returns true iff the database contains the given message.
* Returns true if the database contains the given message.
* <p>
* Locking: messages read.
*/
boolean containsMessage(T txn, MessageId m) throws DbException;
/**
* Returns true iff the user is subscribed to the given group.
* Returns true if the user is subscribed to the given group.
* <p>
* Locking: subscriptions read.
*/
boolean containsSubscription(T txn, GroupId g) throws DbException;
/**
* Returns true iff the user is subscribed to the given group and the
* Returns true if the user is subscribed to the given group and the
* group is visible to the given contact.
* <p>
* Locking: contacts read, subscriptions read.
@@ -189,7 +189,8 @@ interface Database<T> {
* if the message is not present in the database or is not sendable to the
* given contact.
* <p>
* Locking: contacts read, messages read, messageStatuses read.
* Locking: contacts read, messages read, messageStatuses read,
* subscriptions read.
*/
byte[] getMessageIfSendable(T txn, ContactId c, MessageId m)
throws DbException;
@@ -246,7 +247,8 @@ interface Database<T> {
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, with a total size less than or equal to the given size.
* <p>
* Locking: contacts read, messages read, messageStatuses read.
* Locking: contacts read, messages read, messageStatuses read,
* subscriptions read.
*/
Collection<MessageId> getSendableMessages(T txn, ContactId c, int size)
throws DbException;
@@ -293,6 +295,13 @@ interface Database<T> {
Collection<Group> getVisibleSubscriptions(T txn, ContactId c)
throws DbException;
/**
* Returns true if any messages are sendable to the given contact.
* <p>
* Locking: contacts read, messages read, messageStatuses read.
*/
boolean hasSendableMessages(T txn, ContactId c) throws DbException;
/**
* Removes an outstanding batch that has been acknowledged. Any messages in
* the batch that are still considered outstanding (Status.SENT) with

View File

@@ -25,7 +25,7 @@ interface DatabaseCleaner {
void checkFreeSpaceAndClean() throws DbException;
/**
* Returns true iff the amount of free storage space available to the
* Returns true if the amount of free storage space available to the
* database should be checked.
*/
boolean shouldCheckFreeSpace();

View File

@@ -1,5 +1,8 @@
package net.sf.briar.db;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -7,6 +10,7 @@ import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageListener;
import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Message;
@@ -25,6 +29,8 @@ DatabaseCleaner.Callback {
protected final Database<Txn> db;
protected final DatabaseCleaner cleaner;
private final List<MessageListener> listeners =
new ArrayList<MessageListener>(); // Locking: self
private final Object spaceLock = new Object();
private final Object writeLock = new Object();
private long bytesStoredSinceLastCheck = 0L; // Locking: spaceLock
@@ -41,6 +47,18 @@ DatabaseCleaner.Callback {
cleaner.startCleaning();
}
public void addListener(MessageListener m) {
synchronized(listeners) {
listeners.add(m);
}
}
public void removeListener(MessageListener m) {
synchronized(listeners) {
listeners.remove(m);
}
}
/**
* Removes the oldest messages from the database, with a total size less
* than or equal to the given size.
@@ -61,6 +79,18 @@ DatabaseCleaner.Callback {
return sendability;
}
/** Notifies all MessageListeners that new messages may be available. */
protected void callMessageListeners() {
synchronized(listeners) {
if(!listeners.isEmpty()) {
// Shuffle the listeners so we don't always send new messages
// to contacts in the same order
Collections.shuffle(listeners);
for(MessageListener m : listeners) m.messagesAdded();
}
}
}
public void checkFreeSpaceAndClean() throws DbException {
long freeSpace = db.getFreeSpace();
while(freeSpace < MIN_FREE_SPACE) {
@@ -85,7 +115,7 @@ DatabaseCleaner.Callback {
}
/**
* Returns true iff the database contains the given contact.
* Returns true if the database contains the given contact.
* <p>
* Locking: contacts read.
*/
@@ -121,7 +151,7 @@ DatabaseCleaner.Callback {
if(bytesStoredSinceLastCheck > MAX_BYTES_BETWEEN_SPACE_CHECKS) {
if(LOG.isLoggable(Level.FINE))
LOG.fine(bytesStoredSinceLastCheck
+ " bytes stored since last check");
+ " bytes stored since last check");
bytesStoredSinceLastCheck = 0L;
timeOfLastCheck = now;
return true;
@@ -234,7 +264,7 @@ DatabaseCleaner.Callback {
}
if(LOG.isLoggable(Level.FINE))
LOG.fine(direct + " messages affected directly, "
+ indirect + " indirectly");
+ indirect + " indirectly");
}
/**

View File

@@ -1226,6 +1226,42 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public boolean hasSendableMessages(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT messages.messageId FROM messages"
+ " JOIN contactSubscriptions"
+ " ON messages.groupId = contactSubscriptions.groupId"
+ " JOIN visibilities"
+ " ON messages.groupId = visibilities.groupId"
+ " JOIN statuses ON messages.messageId = statuses.messageId"
+ " WHERE contactSubscriptions.contactId = ?"
+ " AND visibilities.contactId = ?"
+ " AND statuses.contactId = ?"
+ " AND status = ? AND sendability > ZERO()"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, c.getInt());
ps.setInt(3, c.getInt());
ps.setShort(4, (short) Status.NEW.ordinal());
ps.setInt(5, 1);
rs = ps.executeQuery();
boolean found = rs.next();
assert !rs.next();
rs.close();
ps.close();
return found;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
public void removeAckedBatch(Connection txn, ContactId c, BatchId b)
throws DbException {
PreparedStatement ps = null;

View File

@@ -151,6 +151,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
public void addLocallyGeneratedMessage(Message m) throws DbException {
boolean added = false;
waitForPermissionToWrite();
contactLock.readLock().lock();
try {
@@ -165,7 +166,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
// Don't store the message if the user has
// unsubscribed from the group
if(db.containsSubscription(txn, m.getGroup())) {
boolean added = storeMessage(txn, m, null);
added = storeMessage(txn, m, null);
if(!added) {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Duplicate local message");
@@ -191,6 +192,8 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} finally {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
if(added) callMessageListeners();
}
public void findLostBatches(ContactId c) throws DbException {
@@ -293,26 +296,32 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
int bytesSent = 0;
messageStatusLock.readLock().lock();
try {
Txn txn = db.startTransaction();
subscriptionLock.readLock().lock();
try {
int capacity = b.getCapacity();
Iterator<MessageId> it =
db.getSendableMessages(txn, c, capacity).iterator();
sent = new ArrayList<MessageId>();
while(it.hasNext()) {
MessageId m = it.next();
byte[] message = db.getMessage(txn, m);
if(!b.writeMessage(message)) break;
bytesSent += message.length;
sent.add(m);
Txn txn = db.startTransaction();
try {
int capacity = b.getCapacity();
Collection<MessageId> sendable =
db.getSendableMessages(txn, c, capacity);
Iterator<MessageId> it = sendable.iterator();
sent = new ArrayList<MessageId>();
while(it.hasNext()) {
MessageId m = it.next();
byte[] raw = db.getMessage(txn, m);
if(!b.writeMessage(raw)) break;
bytesSent += raw.length;
sent.add(m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageStatusLock.readLock().unlock();
@@ -351,24 +360,29 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
Collection<MessageId> sent;
messageStatusLock.readLock().lock();
try{
Txn txn = db.startTransaction();
subscriptionLock.readLock().lock();
try {
sent = new ArrayList<MessageId>();
int bytesSent = 0;
for(MessageId m : requested) {
byte[] message = db.getMessageIfSendable(txn, c, m);
if(message == null) continue;
if(!b.writeMessage(message)) break;
bytesSent += message.length;
sent.add(m);
Txn txn = db.startTransaction();
try {
sent = new ArrayList<MessageId>();
int bytesSent = 0;
for(MessageId m : requested) {
byte[] raw = db.getMessageIfSendable(txn, c, m);
if(raw == null) continue;
if(!b.writeMessage(raw)) break;
bytesSent += raw.length;
sent.add(m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageStatusLock.readLock().unlock();
@@ -610,6 +624,39 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public boolean hasSendableMessages(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
messageStatusLock.readLock().lock();
try {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
boolean has = db.hasSendableMessages(txn, c);
db.commitTransaction(txn);
return has;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageStatusLock.readLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveAck(ContactId c, Ack a) throws DbException {
// Mark all messages in acked batches as seen
contactLock.readLock().lock();
@@ -644,6 +691,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
public void receiveBatch(ContactId c, Batch b) throws DbException {
boolean anyAdded = false;
waitForPermissionToWrite();
contactLock.readLock().lock();
try {
@@ -661,7 +709,10 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
received++;
GroupId g = m.getGroup();
if(db.containsVisibleSubscription(txn, g, c)) {
if(storeMessage(txn, m, c)) stored++;
if(storeMessage(txn, m, c)) {
anyAdded = true;
stored++;
}
}
}
if(LOG.isLoggable(Level.FINE))
@@ -685,6 +736,8 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} finally {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
if(anyAdded) callMessageListeners();
}
public void receiveOffer(ContactId c, Offer o, RequestWriter r)

View File

@@ -116,6 +116,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
public void addLocallyGeneratedMessage(Message m) throws DbException {
boolean added = false;
waitForPermissionToWrite();
synchronized(contactLock) {
synchronized(messageLock) {
@@ -126,7 +127,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
// Don't store the message if the user has
// unsubscribed from the group
if(db.containsSubscription(txn, m.getGroup())) {
boolean added = storeMessage(txn, m, null);
added = storeMessage(txn, m, null);
if(!added) {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Duplicate local message");
@@ -144,6 +145,8 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
// Call the listeners outside the lock
if(added) callMessageListeners();
}
public void findLostBatches(ContactId c) throws DbException {
@@ -217,31 +220,34 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
int capacity = b.getCapacity();
Iterator<MessageId> it =
db.getSendableMessages(txn, c, capacity).iterator();
Collection<MessageId> sent = new ArrayList<MessageId>();
int bytesSent = 0;
while(it.hasNext()) {
MessageId m = it.next();
byte[] message = db.getMessage(txn, m);
if(!b.writeMessage(message)) break;
bytesSent += message.length;
sent.add(m);
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
int capacity = b.getCapacity();
Collection<MessageId> sendable =
db.getSendableMessages(txn, c, capacity);
Iterator<MessageId> it = sendable.iterator();
Collection<MessageId> sent =
new ArrayList<MessageId>();
int bytesSent = 0;
while(it.hasNext()) {
MessageId m = it.next();
byte[] raw = db.getMessage(txn, m);
if(!b.writeMessage(raw)) break;
bytesSent += raw.length;
sent.add(m);
}
BatchId id = b.finish();
if(!sent.isEmpty())
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
BatchId id = b.finish();
// Record the contents of the batch, unless it's empty
if(!sent.isEmpty())
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
}
}
@@ -254,29 +260,31 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
Collection<MessageId> sent = new ArrayList<MessageId>();
int bytesSent = 0;
for(MessageId m : requested) {
byte[] message = db.getMessageIfSendable(txn, c, m);
if(message == null) continue;
if(!b.writeMessage(message)) break;
bytesSent += message.length;
sent.add(m);
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
Collection<MessageId> sent =
new ArrayList<MessageId>();
int bytesSent = 0;
for(MessageId m : requested) {
byte[] raw = db.getMessageIfSendable(txn, c, m);
if(raw == null) continue;
if(!b.writeMessage(raw)) break;
bytesSent += raw.length;
sent.add(m);
}
BatchId id = b.finish();
if(!sent.isEmpty())
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
return sent;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
BatchId id = b.finish();
// Record the contents of the batch, unless it's empty
if(!sent.isEmpty())
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
return sent;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
}
}
@@ -450,6 +458,27 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public boolean hasSendableMessages(ContactId c) throws DbException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
boolean has = db.hasSendableMessages(txn, c);
db.commitTransaction(txn);
return has;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
}
public void receiveAck(ContactId c, Ack a) throws DbException {
// Mark all messages in acked batches as seen
synchronized(contactLock) {
@@ -475,6 +504,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
public void receiveBatch(ContactId c, Batch b) throws DbException {
boolean anyAdded = false;
waitForPermissionToWrite();
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -488,7 +518,10 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
received++;
GroupId g = m.getGroup();
if(db.containsVisibleSubscription(txn, g, c)) {
if(storeMessage(txn, m, c)) stored++;
if(storeMessage(txn, m, c)) {
anyAdded = true;
stored++;
}
}
}
if(LOG.isLoggable(Level.FINE))
@@ -504,6 +537,8 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
// Call the listeners outside the lock
if(anyAdded) callMessageListeners();
}
public void receiveOffer(ContactId c, Offer o, RequestWriter r)