Attach the affected contact IDs to subscription update events.

This commit is contained in:
akwizgran
2011-10-17 23:24:23 +01:00
parent ec56b12384
commit 2f457162a5
13 changed files with 69 additions and 32 deletions

View File

@@ -116,11 +116,12 @@ interface Database<T> {
boolean addPrivateMessage(T txn, Message m, ContactId c) throws DbException;
/**
* Subscribes to the given group.
* Subscribes to the given group and returns true if the subscription did
* not previously exist.
* <p>
* Locking: subscriptions write.
*/
void addSubscription(T txn, Group g) throws DbException;
boolean addSubscription(T txn, Group g) throws DbException;
/**
* Returns true if the database contains the given contact.

View File

@@ -11,6 +11,7 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -1277,19 +1278,19 @@ DatabaseCleaner.Callback {
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
Collection<ContactId> then, now;
contactLock.readLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
// Remove any ex-contacts from the set
Collection<ContactId> present =
new ArrayList<ContactId>(visible.size());
for(ContactId c : visible) {
if(db.containsContact(txn, c)) present.add(c);
}
db.setVisibility(txn, g, present);
// Get the contacts to which the group used to be visible
then = new HashSet<ContactId>(db.getVisibility(txn, g));
// Don't try to make the group visible to ex-contacts
now = new HashSet<ContactId>(visible);
now.retainAll(new HashSet<ContactId>(db.getContacts(txn)));
db.setVisibility(txn, g, now);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1301,19 +1302,22 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
// Work out which contacts were affected by the change
Collection<ContactId> affected = new ArrayList<ContactId>();
for(ContactId c : then) if(!now.contains(c)) affected.add(c);
for(ContactId c : now) if(!then.contains(c)) affected.add(c);
// Call the listeners outside the lock
callListeners(new SubscriptionsUpdatedEvent(affected));
}
public void subscribe(Group g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
boolean added = false;
boolean added;
subscriptionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(db.containsSubscription(txn, g.getId())) {
db.addSubscription(txn, g);
added = true;
}
added = db.addSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1328,7 +1332,8 @@ DatabaseCleaner.Callback {
public void unsubscribe(GroupId g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g);
boolean removed = false;
boolean removed;
Collection<ContactId> affected;
contactLock.readLock().lock();
try {
messageLock.writeLock().lock();
@@ -1339,6 +1344,7 @@ DatabaseCleaner.Callback {
try {
T txn = db.startTransaction();
try {
affected = db.getVisibility(txn, g);
removed = db.removeSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
@@ -1358,7 +1364,7 @@ DatabaseCleaner.Callback {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
if(removed) callListeners(new SubscriptionsUpdatedEvent());
if(removed) callListeners(new SubscriptionsUpdatedEvent(affected));
}
public void checkFreeSpaceAndClean() throws DbException {

View File

@@ -652,10 +652,20 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void addSubscription(Connection txn, Group g) throws DbException {
public boolean addSubscription(Connection txn, Group g) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "INSERT INTO subscriptions"
String sql = "SELECT NULL FROM subscriptions WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getId().getBytes());
rs = ps.executeQuery();
boolean found = rs.next();
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
if(found) return false;
sql = "INSERT INTO subscriptions"
+ " (groupId, groupName, groupKey, start)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
@@ -666,6 +676,7 @@ abstract class JdbcDatabase implements Database<Connection> {
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
return true;
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);

View File

@@ -102,9 +102,12 @@ abstract class StreamConnection implements DatabaseListener {
writerFlags |= Flags.MESSAGES_ADDED;
notifyAll();
} else if(e instanceof SubscriptionsUpdatedEvent) {
// FIXME: Check whether the change affected this contact
writerFlags |= Flags.SUBSCRIPTIONS_UPDATED;
notifyAll();
Collection<ContactId> affected =
((SubscriptionsUpdatedEvent) e).getAffectedContacts();
if(affected.contains(contactId)) {
writerFlags |= Flags.SUBSCRIPTIONS_UPDATED;
notifyAll();
}
} else if(e instanceof TransportsUpdatedEvent) {
writerFlags |= Flags.TRANSPORTS_UPDATED;
notifyAll();