Incremental subscription updates (untested).

This commit is contained in:
akwizgran
2012-05-19 02:17:41 +02:00
parent fadf0221d2
commit d074652f43
5 changed files with 198 additions and 156 deletions

View File

@@ -125,6 +125,15 @@ interface Database<T> {
*/
void addSubscription(T txn, Group g) throws DbException;
/**
* Records the given contact's subscription to the given group starting at
* the given time.
* <p>
* Locking: contact read, subscription write.
*/
void addSubscription(T txn, ContactId c, Group g, long start)
throws DbException;
/**
* Allocates and returns a local index for the given transport. Returns
* null if all indices have been allocated.
@@ -526,6 +535,16 @@ interface Database<T> {
*/
void removeSubscription(T txn, GroupId g) throws DbException;
/**
* Removes any subscriptions for the given contact with IDs between the
* given IDs. If both of the given IDs are null, all subscriptions are
* removed. If only the first is null, all subscriptions with IDs less than
* the second ID are removed. If onlt the second is null, all subscriptions
* with IDs greater than the first are removed.
*/
void removeSubscriptions(T txn, ContactId c, GroupId start, GroupId end)
throws DbException;
/**
* Makes the given group invisible to the given contact.
* <p>
@@ -551,6 +570,13 @@ interface Database<T> {
void setConnectionWindow(T txn, ContactId c, TransportIndex i,
ConnectionWindow w) throws DbException;
/**
* Sets the given contact's database expiry time.
* <p>
* Locking: contact read, subscription write.
*/
void setExpiryTime(T txn, ContactId c, long expiry) throws DbException;
/**
* Sets the local transport properties for the given transport, replacing
* any existing properties for that transport.
@@ -611,23 +637,23 @@ interface Database<T> {
throws DbException;
/**
* Sets the subscriptions for the given contact, replacing any existing
* subscriptions unless the existing subscriptions have a newer timestamp.
* <p>
* Locking: contact read, subscription write.
*/
void setSubscriptions(T txn, ContactId c, Map<Group, Long> subs,
long timestamp) throws DbException;
/**
* Records the time of the latest subscription modification acknowledged by
* the given contact.
* Records the time of the latest subscription update acknowledged by the
* given contact.
* <p>
* Locking: contact read, subscription write.
*/
void setSubscriptionsAcked(T txn, ContactId c, long timestamp)
throws DbException;
/**
* Records the time of the latest subscription update received from the
* given contact.
* <p>
* Locking: contact read, subscription write.
*/
void setSubscriptionsReceived(T txn, ContactId c, long timestamp)
throws DbException;
/**
* Sets the transports for the given contact, replacing any existing
* transports unless the existing transports have a newer timestamp.

View File

@@ -15,6 +15,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
@@ -1175,8 +1176,17 @@ DatabaseCleaner.Callback {
try {
T txn = db.startTransaction();
try {
Map<GroupId, GroupId> holes = s.getHoles();
for(Entry<GroupId, GroupId> e : holes.entrySet()) {
GroupId start = e.getKey(), end = e.getValue();
db.removeSubscriptions(txn, c, start, end);
}
Map<Group, Long> subs = s.getSubscriptions();
db.setSubscriptions(txn, c, subs, s.getTimestamp());
for(Entry<Group, Long> e : subs.entrySet()) {
db.addSubscription(txn, c, e.getKey(), e.getValue());
}
db.setExpiryTime(txn, c, s.getExpiryTime());
db.setSubscriptionsReceived(txn, c, s.getTimestamp());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);

View File

@@ -735,10 +735,8 @@ abstract class JdbcDatabase implements Database<Connection> {
public void addSubscription(Connection txn, Group g) throws DbException {
PreparedStatement ps = null;
try {
// Add the group to the subscriptions table
String sql = "INSERT INTO subscriptions"
+ " (groupId, groupName, groupKey, start)"
+ " VALUES (?, ?, ?, ?)";
+ " (groupId, groupName, groupKey, start) VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getId().getBytes());
ps.setString(2, g.getName());
@@ -754,6 +752,43 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void addSubscription(Connection txn, ContactId c, Group g,
long start) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Check whether the subscription already exists
String sql = "SELECT NULL FROM contactSubscriptions"
+ " WHERE contactId = ? AND groupId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, g.getId().getBytes());
rs = ps.executeQuery();
boolean found = rs.next();
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
if(found) return;
// Add the subscription
sql = "INSERT INTO contactSubscriptions"
+ " (contactId, groupId, groupName, groupKey, start)"
+ " VALUES (?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, g.getId().getBytes());
ps.setString(3, g.getName());
ps.setBytes(4, g.getPublicKey());
ps.setLong(5, start);
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public TransportIndex addTransport(Connection txn, TransportId t)
throws DbException {
PreparedStatement ps = null;
@@ -2234,6 +2269,49 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void removeSubscriptions(Connection txn, ContactId c, GroupId start,
GroupId end) throws DbException {
PreparedStatement ps = null;
try {
if(start == null && end == null) {
// Delete everything
String sql = "DELETE FROM contactSubscriptions"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
} else if(start == null) {
// Delete everything before end
String sql = "DELETE FROM contactSubscriptions"
+ " WHERE contactId = ? AND groupId < ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, end.getBytes());
} else if(end == null) {
// Delete everything after start
String sql = "DELETE FROM contactSubscriptions"
+ " WHERE contactId = ? AND groupId > ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, start.getBytes());
} else {
// Delete everything between start and end
String sql = "DELETE FROM contactSubscriptions"
+ " WHERE contactId = ?"
+ " AND groupId > ? AND groupId < ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, start.getBytes());
ps.setBytes(3, end.getBytes());
}
ps.executeUpdate();
ps.close();
} catch(SQLException e) {
e.printStackTrace();
tryToClose(ps);
throw new DbException(e);
}
}
public void removeVisibility(Connection txn, ContactId c, GroupId g)
throws DbException {
PreparedStatement ps = null;
@@ -2348,6 +2426,24 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setExpiryTime(Connection txn, ContactId c, long expiry)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE subscriptionTimes SET expiry = ?"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, expiry);
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setLocalProperties(Connection txn, TransportId t,
TransportProperties p) throws DbException {
PreparedStatement ps = null;
@@ -2641,66 +2737,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setSubscriptions(Connection txn, ContactId c,
Map<Group, Long> subs, long timestamp) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Return if the timestamp isn't fresh
String sql = "SELECT received FROM subscriptionTimes"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if(!rs.next()) throw new DbStateException();
long lastTimestamp = rs.getLong(1);
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
if(lastTimestamp >= timestamp) return;
// Delete any existing subscriptions
sql = "DELETE FROM contactSubscriptions WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.executeUpdate();
ps.close();
// Store the new subscriptions
sql = "INSERT INTO contactSubscriptions"
+ " (contactId, groupId, groupName, groupKey, start)"
+ " VALUES (?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
for(Entry<Group, Long> e : subs.entrySet()) {
Group g = e.getKey();
ps.setBytes(2, g.getId().getBytes());
ps.setString(3, g.getName());
ps.setBytes(4, g.getPublicKey());
ps.setLong(5, e.getValue());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if(batchAffected.length != subs.size())
throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
// Update the timestamp
sql = "UPDATE subscriptionTimes SET received = ?"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public void setSubscriptionsAcked(Connection txn, ContactId c,
long timestamp) throws DbException {
PreparedStatement ps = null;
@@ -2710,7 +2746,24 @@ abstract class JdbcDatabase implements Database<Connection> {
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
ps.setInt(2, c.getInt());
ps.setLong(3, timestamp);
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setSubscriptionsReceived(Connection txn, ContactId c,
long timestamp) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE subscriptionTimes SET received = ?"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
ps.close();