Implemented subscription visibility. If a subscription is not visible

to a contact, do not accept, offer, or send messages belonging to that
group to or from that contact, and do not list that group in
subscription updates sent to that contact.
This commit is contained in:
akwizgran
2011-07-27 16:43:19 +01:00
parent 4311b1a224
commit e93fbe0b20
8 changed files with 398 additions and 36 deletions

View File

@@ -129,6 +129,15 @@ interface Database<T> {
*/
boolean containsSubscription(T txn, GroupId g) throws DbException;
/**
* Returns true iff the user is subscribed to the given group and the
* group is visible to the given contact.
* <p>
* Locking: contacts read, subscriptions read.
*/
boolean containsVisibleSubscription(T txn, GroupId g, ContactId c)
throws DbException;
/**
* Returns the IDs of any batches received from the given contact that need
* to be acknowledged.
@@ -195,7 +204,8 @@ interface Database<T> {
/**
* Returns the number of children of the message identified by the given
* ID that are present in the database and sendable.
* ID that are present in the database and have sendability scores greater
* than zero.
* <p>
* Locking: messages read.
*/
@@ -269,6 +279,20 @@ interface Database<T> {
*/
Map<String, String> getTransports(T txn, ContactId c) throws DbException;
/**
* Returns the contacts to which the given group is visible.
* <p>
* Locking: contacts read, subscriptions read.
*/
Collection<ContactId> getVisibility(T txn, GroupId g) throws DbException;
/**
* Returns the groups to which the user subscribes that are visible to the
* given contact.
*/
Collection<Group> getVisibleSubscriptions(T txn, ContactId c)
throws DbException;
/**
* Removes an outstanding batch that has been acknowledged. Any messages in
* the batch that are still considered outstanding (Status.SENT) with
@@ -380,4 +404,13 @@ interface Database<T> {
*/
void setTransports(T txn, ContactId c, Map<String, String> transports,
long timestamp) throws DbException;
/**
* Makes the given group visible to the given set of contacts and invisible
* to any other contacts.
* <p>
* Locking: contacts read, subscriptions write.
*/
void setVisibility(T txn, GroupId g, Collection<ContactId> visible)
throws DbException;
}

View File

@@ -36,8 +36,8 @@ import net.sf.briar.util.FileUtils;
*/
abstract class JdbcDatabase implements Database<Connection> {
private static final String CREATE_LOCAL_SUBSCRIPTIONS =
"CREATE TABLE localSubscriptions"
private static final String CREATE_SUBSCRIPTIONS =
"CREATE TABLE subscriptions"
+ " (groupId HASH NOT NULL,"
+ " groupName VARCHAR NOT NULL,"
+ " groupKey BINARY,"
@@ -54,7 +54,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " raw BLOB NOT NULL,"
+ " sendability INT NOT NULL,"
+ " PRIMARY KEY (messageId),"
+ " FOREIGN KEY (groupId) REFERENCES localSubscriptions (groupId)"
+ " FOREIGN KEY (groupId) REFERENCES subscriptions (groupId)"
+ " ON DELETE CASCADE)";
private static final String INDEX_MESSAGES_BY_PARENT =
@@ -76,6 +76,19 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " transportsTimestamp TIMESTAMP NOT NULL,"
+ " PRIMARY KEY (contactId))";
private static final String CREATE_VISIBILITIES =
"CREATE TABLE visibilities"
+ " (groupId HASH NOT NULL,"
+ " contactId INT NOT NULL,"
+ " PRIMARY KEY (groupId, contactId),"
+ " FOREIGN KEY (groupId) REFERENCES subscriptions (groupId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String INDEX_VISIBILITIES_BY_GROUP =
"CREATE INDEX visibilitiesByGroup ON visibilities (groupId)";
private static final String CREATE_BATCHES_TO_ACK =
"CREATE TABLE batchesToAck"
+ " (batchId HASH NOT NULL,"
@@ -216,8 +229,8 @@ abstract class JdbcDatabase implements Database<Connection> {
try {
s = txn.createStatement();
if(LOG.isLoggable(Level.FINE))
LOG.fine("Creating localSubscriptions table");
s.executeUpdate(insertTypeNames(CREATE_LOCAL_SUBSCRIPTIONS));
LOG.fine("Creating subscriptions table");
s.executeUpdate(insertTypeNames(CREATE_SUBSCRIPTIONS));
if(LOG.isLoggable(Level.FINE))
LOG.fine("Creating messages table");
s.executeUpdate(insertTypeNames(CREATE_MESSAGES));
@@ -228,6 +241,10 @@ abstract class JdbcDatabase implements Database<Connection> {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Creating contacts table");
s.executeUpdate(insertTypeNames(CREATE_CONTACTS));
if(LOG.isLoggable(Level.FINE))
LOG.fine("Creating visibilities table");
s.executeUpdate(insertTypeNames(CREATE_VISIBILITIES));
s.executeUpdate(INDEX_VISIBILITIES_BY_GROUP);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Creating batchesToAck table");
s.executeUpdate(insertTypeNames(CREATE_BATCHES_TO_ACK));
@@ -527,7 +544,7 @@ abstract class JdbcDatabase implements Database<Connection> {
public void addSubscription(Connection txn, Group g) throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO localSubscriptions"
String sql = "INSERT INTO subscriptions"
+ " (groupId, groupName, groupKey)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
@@ -603,7 +620,7 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT COUNT(groupId) FROM localSubscriptions"
String sql = "SELECT COUNT(groupId) FROM subscriptions"
+ " WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
@@ -625,6 +642,36 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public boolean containsVisibleSubscription(Connection txn, GroupId g,
ContactId c) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT COUNT(subscriptions.groupId)"
+ " FROM subscriptions JOIN visibilities"
+ " ON subscriptions.groupId = visibilities.groupId"
+ " WHERE subscriptions.groupId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
ps.setInt(2, c.getInt());
rs = ps.executeQuery();
boolean found = rs.next();
assert found;
int count = rs.getInt(1);
assert count <= 1;
boolean more = rs.next();
assert !more;
rs.close();
ps.close();
return count > 0;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
public Collection<BatchId> getBatchesToAck(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
@@ -759,16 +806,20 @@ abstract class JdbcDatabase implements Database<Connection> {
String sql = "SELECT size, raw FROM messages"
+ " JOIN contactSubscriptions"
+ " ON messages.groupId = contactSubscriptions.groupId"
+ " JOIN visibilities"
+ " ON messages.groupId = visibilities.groupId"
+ " JOIN statuses ON messages.messageId = statuses.messageId"
+ " WHERE messages.messageId = ?"
+ " AND contactSubscriptions.contactId = ?"
+ " AND statuses.contactId = ? AND status = ?"
+ " AND sendability > ZERO()";
+ " AND visibilities.contactId = ?"
+ " AND statuses.contactId = ?"
+ " AND status = ? AND sendability > ZERO()";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
ps.setInt(3, c.getInt());
ps.setShort(4, (short) Status.NEW.ordinal());
ps.setInt(4, c.getInt());
ps.setShort(5, (short) Status.NEW.ordinal());
rs = ps.executeQuery();
byte[] raw = null;
if(rs.next()) {
@@ -985,15 +1036,19 @@ abstract class JdbcDatabase implements Database<Connection> {
String sql = "SELECT size, messages.messageId FROM messages"
+ " JOIN contactSubscriptions"
+ " ON messages.groupId = contactSubscriptions.groupId"
+ " JOIN visibilities"
+ " ON messages.groupId = visibilities.groupId"
+ " JOIN statuses ON messages.messageId = statuses.messageId"
+ " WHERE contactSubscriptions.contactId = ?"
+ " AND statuses.contactId = ? AND status = ?"
+ " AND sendability > ZERO()";
+ " AND visibilities.contactId = ?"
+ " AND statuses.contactId = ?"
+ " AND status = ? AND sendability > ZERO()";
// FIXME: Investigate the performance impact of "ORDER BY timestamp"
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, c.getInt());
ps.setShort(3, (short) Status.NEW.ordinal());
ps.setInt(3, c.getInt());
ps.setShort(4, (short) Status.NEW.ordinal());
rs = ps.executeQuery();
Collection<MessageId> ids = new ArrayList<MessageId>();
int total = 0;
@@ -1025,7 +1080,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
String sql = "SELECT groupId, groupName, groupKey"
+ " FROM localSubscriptions";
+ " FROM subscriptions";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
Collection<Group> subs = new ArrayList<Group>();
@@ -1075,6 +1130,36 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<Group> getVisibleSubscriptions(Connection txn,
ContactId c) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT subscriptions.groupId, groupName, groupKey"
+ " FROM subscriptions JOIN visibilities"
+ " ON subscriptions.groupId = visibilities.groupId"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
Collection<Group> subs = new ArrayList<Group>();
while(rs.next()) {
GroupId id = new GroupId(rs.getBytes(1));
String name = rs.getString(2);
byte[] publicKey = rs.getBytes(3);
subs.add(groupFactory.createGroup(id, name, publicKey));
}
rs.close();
ps.close();
return subs;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
public Map<String, String> getTransports(Connection txn)
throws DbException {
PreparedStatement ps = null;
@@ -1119,9 +1204,30 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<ContactId> getVisibility(Connection txn, GroupId g)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT contactId FROM visibilities WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
rs = ps.executeQuery();
Collection<ContactId> visible = new ArrayList<ContactId>();
while(rs.next()) visible.add(new ContactId(rs.getInt(1)));
rs.close();
ps.close();
return visible;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
public void removeAckedBatch(Connection txn, ContactId c, BatchId b)
throws DbException {
// Increment the passover count of all older outstanding batches
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -1138,6 +1244,7 @@ abstract class JdbcDatabase implements Database<Connection> {
assert !more;
rs.close();
ps.close();
// Increment the passover count of all older outstanding batches
sql = "UPDATE outstandingBatches SET passover = passover + ?"
+ " WHERE contactId = ? AND timestamp < ?";
ps = txn.prepareStatement(sql);
@@ -1270,7 +1377,7 @@ abstract class JdbcDatabase implements Database<Connection> {
throws DbException {
PreparedStatement ps = null;
try {
String sql = "DELETE FROM localSubscriptions WHERE groupId = ?";
String sql = "DELETE FROM subscriptions WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
int rowsAffected = ps.executeUpdate();
@@ -1403,10 +1510,15 @@ abstract class JdbcDatabase implements Database<Connection> {
String sql = "SELECT COUNT(messages.messageId) FROM messages"
+ " JOIN contactSubscriptions"
+ " ON messages.groupId = contactSubscriptions.groupId"
+ " WHERE messageId = ? AND contactId = ?";
+ " JOIN visibilities"
+ " ON messages.groupId = visibilities.groupId"
+ " WHERE messageId = ?"
+ " AND contactSubscriptions.contactId = ?"
+ " AND visibilities.contactId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
ps.setInt(3, c.getInt());
rs = ps.executeQuery();
boolean found = rs.next();
assert found;
@@ -1504,7 +1616,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.executeUpdate();
ps.close();
// Store the new transports
if(transports != null) {
if(!transports.isEmpty()) {
sql = "INSERT INTO localTransports (key, value)"
+ " VALUES (?, ?)";
ps = txn.prepareStatement(sql);
@@ -1585,4 +1697,36 @@ abstract class JdbcDatabase implements Database<Connection> {
throw new DbException(e);
}
}
public void setVisibility(Connection txn, GroupId g,
Collection<ContactId> visible) throws DbException {
PreparedStatement ps = null;
try {
// Delete any existing visibilities
String sql = "DELETE FROM visibilities WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
ps.executeUpdate();
ps.close();
if(visible.isEmpty()) return;
// Store the new visibilities
sql = "INSERT INTO visibilities (groupId, contactId)"
+ " VALUES (?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
for(ContactId c : visible) {
ps.setInt(2, c.getInt());
ps.addBatch();
}
int[] rowsAffectedArray = ps.executeBatch();
assert rowsAffectedArray.length == visible.size();
for(int i = 0; i < rowsAffectedArray.length; i++) {
assert rowsAffectedArray[i] == 1;
}
} catch(SQLException e) {
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
}

View File

@@ -448,7 +448,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
try {
Txn txn = db.startTransaction();
try {
Collection<Group> subs = db.getSubscriptions(txn);
Collection<Group> subs = db.getVisibleSubscriptions(txn, c);
s.writeSubscriptions(subs);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
@@ -588,6 +588,28 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public Collection<ContactId> getVisibility(GroupId g) throws DbException {
contactLock.readLock().lock();
try {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Collection<ContactId> visible = db.getVisibility(txn, g);
db.commitTransaction(txn);
return visible;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveAck(ContactId c, Ack a) throws DbException {
// Mark all messages in acked batches as seen
contactLock.readLock().lock();
@@ -638,7 +660,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
for(Message m : b.getMessages()) {
received++;
GroupId g = m.getGroup();
if(db.containsSubscription(txn, g)) {
if(db.containsVisibleSubscription(txn, g, c)) {
if(storeMessage(txn, m, c)) stored++;
}
}
@@ -821,6 +843,34 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
contactLock.readLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
// Remove any ex-contacts from the set
Collection<ContactId> present =
new ArrayList<ContactId>(visible.size());
for(ContactId c : visible) {
if(db.containsContact(txn, c)) present.add(c);
}
db.setVisibility(txn, g, present);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void subscribe(Group g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
subscriptionLock.writeLock().lock();

View File

@@ -322,7 +322,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
Collection<Group> subs = db.getSubscriptions(txn);
Collection<Group> subs = db.getVisibleSubscriptions(txn, c);
s.writeSubscriptions(subs);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
@@ -434,6 +434,22 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public Collection<ContactId> getVisibility(GroupId g) throws DbException {
synchronized(contactLock) {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
Collection<ContactId> visible = db.getVisibility(txn, g);
db.commitTransaction(txn);
return visible;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
public void receiveAck(ContactId c, Ack a) throws DbException {
// Mark all messages in acked batches as seen
synchronized(contactLock) {
@@ -471,7 +487,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
for(Message m : b.getMessages()) {
received++;
GroupId g = m.getGroup();
if(db.containsSubscription(txn, g)) {
if(db.containsVisibleSubscription(txn, g, c)) {
if(storeMessage(txn, m, c)) stored++;
}
}
@@ -604,6 +620,28 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
synchronized(contactLock) {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
// Remove any ex-contacts from the set
Collection<ContactId> present =
new ArrayList<ContactId>(visible.size());
for(ContactId c : visible) {
if(db.containsContact(txn, c)) present.add(c);
}
db.setVisibility(txn, g, present);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
public void subscribe(Group g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
synchronized(subscriptionLock) {