Listeners for subscription changes.

This commit is contained in:
akwizgran
2011-08-02 09:22:54 +01:00
parent 0d0885bf4b
commit ff0909a0e9
7 changed files with 76 additions and 36 deletions

View File

@@ -48,11 +48,11 @@ public interface DatabaseComponent {
/** Waits for any open transactions to finish and closes the database. */ /** Waits for any open transactions to finish and closes the database. */
void close() throws DbException; void close() throws DbException;
/** Adds a listener to be notified when new messages are available. */ /** Adds a listener to be notified when database events occur. */
void addListener(MessageListener m); void addListener(DatabaseListener d);
/** Removes a listener. */ /** Removes a listener. */
void removeListener(MessageListener m); void removeListener(DatabaseListener d);
/** /**
* Adds a new contact to the database with the given transport details and * Adds a new contact to the database with the given transport details and

View File

@@ -0,0 +1,13 @@
package net.sf.briar.api.db;
/** An interface for receiving notifications when database events occur. */
public interface DatabaseListener {
static enum Event {
MESSAGES_ADDED,
SUBSCRIPTIONS_UPDATED,
TRANSPORTS_UPDATED
};
void eventOccurred(Event e);
}

View File

@@ -1,10 +0,0 @@
package net.sf.briar.api.db;
/**
* An interface for receiving notifications when the database may have new
* messages available.
*/
public interface MessageListener {
void messagesAdded();
}

View File

@@ -10,7 +10,7 @@ import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating; import net.sf.briar.api.Rating;
import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageListener; import net.sf.briar.api.db.DatabaseListener;
import net.sf.briar.api.db.Status; import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -29,8 +29,8 @@ DatabaseCleaner.Callback {
protected final Database<Txn> db; protected final Database<Txn> db;
protected final DatabaseCleaner cleaner; protected final DatabaseCleaner cleaner;
private final List<MessageListener> listeners = private final List<DatabaseListener> listeners =
new ArrayList<MessageListener>(); // Locking: self new ArrayList<DatabaseListener>(); // Locking: self
private final Object spaceLock = new Object(); private final Object spaceLock = new Object();
private final Object writeLock = new Object(); private final Object writeLock = new Object();
private long bytesStoredSinceLastCheck = 0L; // Locking: spaceLock private long bytesStoredSinceLastCheck = 0L; // Locking: spaceLock
@@ -47,15 +47,15 @@ DatabaseCleaner.Callback {
cleaner.startCleaning(); cleaner.startCleaning();
} }
public void addListener(MessageListener m) { public void addListener(DatabaseListener d) {
synchronized(listeners) { synchronized(listeners) {
listeners.add(m); listeners.add(d);
} }
} }
public void removeListener(MessageListener m) { public void removeListener(DatabaseListener d) {
synchronized(listeners) { synchronized(listeners) {
listeners.remove(m); listeners.remove(d);
} }
} }
@@ -80,13 +80,13 @@ DatabaseCleaner.Callback {
} }
/** Notifies all MessageListeners that new messages may be available. */ /** Notifies all MessageListeners that new messages may be available. */
protected void callMessageListeners() { protected void callListeners(DatabaseListener.Event e) {
synchronized(listeners) { synchronized(listeners) {
if(!listeners.isEmpty()) { if(!listeners.isEmpty()) {
// Shuffle the listeners so we don't always send new messages // Shuffle the listeners so we don't always send new packets
// to contacts in the same order // to contacts in the same order
Collections.shuffle(listeners); Collections.shuffle(listeners);
for(MessageListener m : listeners) m.messagesAdded(); for(DatabaseListener d : listeners) d.eventOccurred(e);
} }
} }
} }

View File

@@ -12,6 +12,7 @@ import java.util.logging.Logger;
import net.sf.briar.api.ContactId; import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating; import net.sf.briar.api.Rating;
import net.sf.briar.api.db.DatabaseListener;
import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.db.NoSuchContactException;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
@@ -193,7 +194,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
contactLock.readLock().unlock(); contactLock.readLock().unlock();
} }
// Call the listeners outside the lock // Call the listeners outside the lock
if(added) callMessageListeners(); if(added) callListeners(DatabaseListener.Event.MESSAGES_ADDED);
} }
public void findLostBatches(ContactId c) throws DbException { public void findLostBatches(ContactId c) throws DbException {
@@ -742,7 +743,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
contactLock.readLock().unlock(); contactLock.readLock().unlock();
} }
// Call the listeners outside the lock // Call the listeners outside the lock
if(anyAdded) callMessageListeners(); if(anyAdded) callListeners(DatabaseListener.Event.MESSAGES_ADDED);
} }
public void receiveOffer(ContactId c, Offer o, RequestWriter r) public void receiveOffer(ContactId c, Offer o, RequestWriter r)
@@ -931,11 +932,15 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void subscribe(Group g) throws DbException { public void subscribe(Group g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
boolean added = false;
subscriptionLock.writeLock().lock(); subscriptionLock.writeLock().lock();
try { try {
Txn txn = db.startTransaction(); Txn txn = db.startTransaction();
try { try {
db.addSubscription(txn, g); if(!db.containsSubscription(txn, g.getId())) {
db.addSubscription(txn, g);
added = true;
}
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
@@ -944,10 +949,13 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} finally { } finally {
subscriptionLock.writeLock().unlock(); subscriptionLock.writeLock().unlock();
} }
// Call the listeners outside the lock
if(added) callListeners(DatabaseListener.Event.SUBSCRIPTIONS_UPDATED);
} }
public void unsubscribe(GroupId g) throws DbException { public void unsubscribe(GroupId g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g); if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g);
boolean removed = false;
contactLock.readLock().lock(); contactLock.readLock().lock();
try { try {
messageLock.writeLock().lock(); messageLock.writeLock().lock();
@@ -958,7 +966,10 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
try { try {
Txn txn = db.startTransaction(); Txn txn = db.startTransaction();
try { try {
db.removeSubscription(txn, g); if(db.containsSubscription(txn, g)) {
db.removeSubscription(txn, g);
removed = true;
}
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
@@ -976,5 +987,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} finally { } finally {
contactLock.readLock().unlock(); contactLock.readLock().unlock();
} }
// Call the listeners outside the lock
if(removed) callListeners(DatabaseListener.Event.SUBSCRIPTIONS_UPDATED);
} }
} }

View File

@@ -11,6 +11,7 @@ import java.util.logging.Logger;
import net.sf.briar.api.ContactId; import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating; import net.sf.briar.api.Rating;
import net.sf.briar.api.db.DatabaseListener;
import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.db.NoSuchContactException;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
@@ -146,7 +147,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} }
} }
// Call the listeners outside the lock // Call the listeners outside the lock
if(added) callMessageListeners(); if(added) callListeners(DatabaseListener.Event.MESSAGES_ADDED);
} }
public void findLostBatches(ContactId c) throws DbException { public void findLostBatches(ContactId c) throws DbException {
@@ -550,7 +551,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} }
} }
// Call the listeners outside the lock // Call the listeners outside the lock
if(anyAdded) callMessageListeners(); if(anyAdded) callListeners(DatabaseListener.Event.MESSAGES_ADDED);
} }
public void receiveOffer(ContactId c, Offer o, RequestWriter r) public void receiveOffer(ContactId c, Offer o, RequestWriter r)
@@ -691,27 +692,37 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void subscribe(Group g) throws DbException { public void subscribe(Group g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
boolean added = false;
synchronized(subscriptionLock) { synchronized(subscriptionLock) {
Txn txn = db.startTransaction(); Txn txn = db.startTransaction();
try { try {
db.addSubscription(txn, g); if(!db.containsSubscription(txn, g.getId())) {
db.addSubscription(txn, g);
added = true;
}
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
throw e; throw e;
} }
} }
// Call the listeners outside the lock
if(added) callListeners(DatabaseListener.Event.SUBSCRIPTIONS_UPDATED);
} }
public void unsubscribe(GroupId g) throws DbException { public void unsubscribe(GroupId g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g); if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g);
boolean removed = false;
synchronized(contactLock) { synchronized(contactLock) {
synchronized(messageLock) { synchronized(messageLock) {
synchronized(messageStatusLock) { synchronized(messageStatusLock) {
synchronized(subscriptionLock) { synchronized(subscriptionLock) {
Txn txn = db.startTransaction(); Txn txn = db.startTransaction();
try { try {
db.removeSubscription(txn, g); if(db.containsSubscription(txn, g)) {
db.removeSubscription(txn, g);
removed = true;
}
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
@@ -721,5 +732,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} }
} }
} }
// Call the listeners outside the lock
if(removed) callListeners(DatabaseListener.Event.SUBSCRIPTIONS_UPDATED);
} }
} }

View File

@@ -12,7 +12,7 @@ import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating; import net.sf.briar.api.Rating;
import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageListener; import net.sf.briar.api.db.DatabaseListener;
import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.db.NoSuchContactException;
import net.sf.briar.api.db.Status; import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
@@ -81,7 +81,7 @@ public abstract class DatabaseComponentTest extends TestCase {
final Database<Object> database = context.mock(Database.class); final Database<Object> database = context.mock(Database.class);
final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class);
final Group group = context.mock(Group.class); final Group group = context.mock(Group.class);
final MessageListener listener = context.mock(MessageListener.class); final DatabaseListener listener = context.mock(DatabaseListener.class);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
allowing(database).startTransaction(); allowing(database).startTransaction();
will(returnValue(txn)); will(returnValue(txn));
@@ -104,12 +104,22 @@ public abstract class DatabaseComponentTest extends TestCase {
oneOf(database).getTransports(txn, contactId); oneOf(database).getTransports(txn, contactId);
will(returnValue(transports)); will(returnValue(transports));
// subscribe(group) // subscribe(group)
oneOf(group).getId();
will(returnValue(groupId));
oneOf(database).containsSubscription(txn, groupId);
will(returnValue(false));
oneOf(database).addSubscription(txn, group); oneOf(database).addSubscription(txn, group);
oneOf(listener).eventOccurred(
DatabaseListener.Event.SUBSCRIPTIONS_UPDATED);
// getSubscriptions() // getSubscriptions()
oneOf(database).getSubscriptions(txn); oneOf(database).getSubscriptions(txn);
will(returnValue(Collections.singletonList(groupId))); will(returnValue(Collections.singletonList(groupId)));
// unsubscribe(groupId) // unsubscribe(groupId)
oneOf(database).containsSubscription(txn, groupId);
will(returnValue(true));
oneOf(database).removeSubscription(txn, groupId); oneOf(database).removeSubscription(txn, groupId);
oneOf(listener).eventOccurred(
DatabaseListener.Event.SUBSCRIPTIONS_UPDATED);
// removeContact(contactId) // removeContact(contactId)
oneOf(database).removeContact(txn, contactId); oneOf(database).removeContact(txn, contactId);
// close() // close()
@@ -1035,7 +1045,7 @@ public abstract class DatabaseComponentTest extends TestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Database<Object> database = context.mock(Database.class); final Database<Object> database = context.mock(Database.class);
final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class);
final MessageListener listener = context.mock(MessageListener.class); final DatabaseListener listener = context.mock(DatabaseListener.class);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// addLocallyGeneratedMessage(message) // addLocallyGeneratedMessage(message)
oneOf(database).startTransaction(); oneOf(database).startTransaction();
@@ -1054,7 +1064,8 @@ public abstract class DatabaseComponentTest extends TestCase {
oneOf(database).setSendability(txn, messageId, 0); oneOf(database).setSendability(txn, messageId, 0);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
// The message was added, so the listener should be called // The message was added, so the listener should be called
oneOf(listener).messagesAdded(); oneOf(listener).eventOccurred(
DatabaseListener.Event.MESSAGES_ADDED);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner); DatabaseComponent db = createDatabaseComponent(database, cleaner);
@@ -1070,7 +1081,7 @@ public abstract class DatabaseComponentTest extends TestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Database<Object> database = context.mock(Database.class); final Database<Object> database = context.mock(Database.class);
final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class);
final MessageListener listener = context.mock(MessageListener.class); final DatabaseListener listener = context.mock(DatabaseListener.class);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// addLocallyGeneratedMessage(message) // addLocallyGeneratedMessage(message)
oneOf(database).startTransaction(); oneOf(database).startTransaction();