Further progress towards incremental subscription updates.

This commit is contained in:
akwizgran
2012-05-18 23:54:03 +02:00
parent b4f0da53b5
commit 1ca4ea9dcd
16 changed files with 340 additions and 405 deletions

View File

@@ -14,8 +14,8 @@ public interface PacketFactory {
Request createRequest(BitSet requested, int length);
SubscriptionUpdate createSubscriptionUpdate(Map<Group, Long> subs,
long timestamp);
SubscriptionUpdate createSubscriptionUpdate(Map<GroupId, GroupId> holes,
Map<Group, Long> subs, long expiry, long timestamp);
TransportUpdate createTransportUpdate(Collection<Transport> transports,
long timestamp);

View File

@@ -21,9 +21,6 @@ public interface ProtocolConstants {
/** The maximum length of a property's key or value in UTF-8 bytes. */
static final int MAX_PROPERTY_LENGTH = 100;
/** The maximum number of groups a node may subscribe to. */
static final int MAX_GROUPS = 5000;
/** The maximum length of a group's name in UTF-8 bytes. */
static final int MAX_GROUP_NAME_LENGTH = 50;

View File

@@ -5,9 +5,18 @@ import java.util.Map;
/** A packet updating the sender's subscriptions. */
public interface SubscriptionUpdate {
/** Returns the holes contained in the update. */
Map<GroupId, GroupId> getHoles();
/** Returns the subscriptions contained in the update. */
Map<Group, Long> getSubscriptions();
/**
* Returns the expiry time of the contact's database. Messages that are
* older than the expiry time must not be sent to the contact.
*/
long getExpiryTime();
/**
* Returns the update's timestamp. Updates that are older than the newest
* update received from the same contact must be ignored.

View File

@@ -88,7 +88,8 @@ interface Database<T> {
* and should be erased by the caller once the transaction has been
* committed or aborted.
* <p>
* Locking: contact write.
* Locking: contact write, subscription write, transport write,
* window write.
*/
ContactId addContact(T txn, byte[] inSecret, byte[] outSecret,
Collection<byte[]> erase) throws DbException;
@@ -224,6 +225,13 @@ interface Database<T> {
*/
Collection<ContactId> getContacts(T txn) throws DbException;
/**
* Returns the approximate expiry time of the database.
* <p>
* Locking: message read.
*/
long getExpiryTime(T txn) throws DbException;
/**
* Returns the amount of free storage space available to the database, in
* bytes. This is based on the minimum of the space available on the device
@@ -410,22 +418,6 @@ interface Database<T> {
*/
Collection<Group> getSubscriptions(T txn, ContactId c) throws DbException;
/**
* Returns the time at which the subscriptions visible to the given contact
* were last modified.
* <p>
* Locking: contact read, subscription read.
*/
long getSubscriptionsModified(T txn, ContactId c) throws DbException;
/**
* Returns the time at which a subscription update was last sent to the
* given contact.
* <p>
* Locking: contact read, subscription read.
*/
long getSubscriptionsSent(T txn, ContactId c) throws DbException;
/**
* Returns the time at which the local transports were last modified.
* <p>
@@ -456,10 +448,23 @@ interface Database<T> {
Collection<ContactId> getVisibility(T txn, GroupId g) throws DbException;
/**
* Returns the groups to which the user subscribes that are visible to the
* given contact.
* Returns any holes covering unsubscriptions that are visible to the given
* contact, occurred strictly before the given timestamp, and have not yet
* been acknowledged.
* <p>
* Locking: contact read, subscription read.
*/
Map<Group, Long> getVisibleSubscriptions(T txn, ContactId c)
Map<GroupId, GroupId> getVisibleHoles(T txn, ContactId c, long timestamp)
throws DbException;
/**
* Returns any subscriptions that are visible to the given contact,
* occurred strictly before the given timestamp, and have not yet been
* acknowledged.
* <p>
* Locking: contact read, subscription read.
*/
Map<Group, Long> getVisibleSubscriptions(T txn, ContactId c, long timestamp)
throws DbException;
/**
@@ -615,21 +620,12 @@ interface Database<T> {
long timestamp) throws DbException;
/**
* Records the time at which the subscriptions visible to the given contacts
* were last modified.
* Records the time of the latest subscription modification acknowledged by
* the given contact.
* <p>
* Locking: contact read, subscription write.
*/
void setSubscriptionsModified(T txn, Collection<ContactId> contacts,
long timestamp) throws DbException;
/**
* Records the time at which a subscription update was last sent to the
* given contact.
* <p>
* Locking: contact read, subscription write.
*/
void setSubscriptionsSent(T txn, ContactId c, long timestamp)
void setSubscriptionsAcked(T txn, ContactId c, long timestamp)
throws DbException;
/**

View File

@@ -173,13 +173,28 @@ DatabaseCleaner.Callback {
Collection<byte[]> erase = new ArrayList<byte[]>();
contactLock.writeLock().lock();
try {
T txn = db.startTransaction();
subscriptionLock.writeLock().lock();
try {
c = db.addContact(txn, inSecret, outSecret, erase);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
transportLock.writeLock().lock();
try {
windowLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
c = db.addContact(txn, inSecret, outSecret, erase);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
windowLock.writeLock().unlock();
}
} finally {
transportLock.writeLock().unlock();
}
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
@@ -606,9 +621,9 @@ DatabaseCleaner.Callback {
public SubscriptionUpdate generateSubscriptionUpdate(ContactId c)
throws DbException {
boolean due;
Map<GroupId, GroupId> holes;
Map<Group, Long> subs;
long timestamp;
long expiry, timestamp;
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -616,10 +631,10 @@ DatabaseCleaner.Callback {
try {
T txn = db.startTransaction();
try {
// Work out whether an update is due
long modified = db.getSubscriptionsModified(txn, c);
long sent = db.getSubscriptionsSent(txn, c);
due = modified >= sent || updateIsDue(sent);
timestamp = System.currentTimeMillis() - 1;
holes = db.getVisibleHoles(txn, c, timestamp);
subs = db.getVisibleSubscriptions(txn, c, timestamp);
expiry = db.getExpiryTime(txn);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -628,26 +643,11 @@ DatabaseCleaner.Callback {
} finally {
subscriptionLock.readLock().unlock();
}
if(!due) return null;
subscriptionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
subs = db.getVisibleSubscriptions(txn, c);
timestamp = System.currentTimeMillis();
db.setSubscriptionsSent(txn, c, timestamp);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
return packetFactory.createSubscriptionUpdate(subs, timestamp);
return packetFactory.createSubscriptionUpdate(holes, subs, expiry,
timestamp);
}
private boolean updateIsDue(long sent) {
@@ -1448,10 +1448,6 @@ DatabaseCleaner.Callback {
affected.add(c);
}
}
if(!affected.isEmpty()) {
db.setSubscriptionsModified(txn, affected,
System.currentTimeMillis());
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);

View File

@@ -90,7 +90,7 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String INDEX_MESSAGES_BY_AUTHOR =
"CREATE INDEX messagesByAuthor ON messages (authorId)";
private static final String INDEX_MESSAGES_BY_BIGINT =
private static final String INDEX_MESSAGES_BY_TIMESTAMP =
"CREATE INDEX messagesByTimestamp ON messages (timestamp)";
private static final String INDEX_MESSAGES_BY_SENDABILITY =
@@ -238,12 +238,12 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_SUBSCRIPTION_TIMESTAMPS =
"CREATE TABLE subscriptionTimestamps"
private static final String CREATE_SUBSCRIPTION_TIMES =
"CREATE TABLE subscriptionTimes"
+ " (contactId INT NOT NULL,"
+ " sent BIGINT NOT NULL,"
+ " received BIGINT NOT NULL,"
+ " modified BIGINT NOT NULL,"
+ " acked BIGINT NOT NULL,"
+ " expiry BIGINT NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
@@ -332,7 +332,7 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(insertTypeNames(CREATE_MESSAGES));
s.executeUpdate(INDEX_MESSAGES_BY_PARENT);
s.executeUpdate(INDEX_MESSAGES_BY_AUTHOR);
s.executeUpdate(INDEX_MESSAGES_BY_BIGINT);
s.executeUpdate(INDEX_MESSAGES_BY_TIMESTAMP);
s.executeUpdate(INDEX_MESSAGES_BY_SENDABILITY);
s.executeUpdate(insertTypeNames(CREATE_VISIBILITIES));
s.executeUpdate(INDEX_VISIBILITIES_BY_GROUP);
@@ -352,7 +352,7 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(insertTypeNames(CREATE_CONTACT_TRANSPORT_PROPS));
s.executeUpdate(insertTypeNames(CREATE_CONNECTION_CONTEXTS));
s.executeUpdate(insertTypeNames(CREATE_CONNECTION_WINDOWS));
s.executeUpdate(insertTypeNames(CREATE_SUBSCRIPTION_TIMESTAMPS));
s.executeUpdate(insertTypeNames(CREATE_SUBSCRIPTION_TIMES));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORT_TIMESTAMPS));
s.executeUpdate(insertTypeNames(CREATE_FLAGS));
s.close();
@@ -521,9 +521,17 @@ abstract class JdbcDatabase implements Database<Connection> {
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
// Create the head-of-list pointer for the visibility list
sql = "INSERT INTO visibilities (contactId, deleted)"
+ " VALUES (?, ZERO())";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
// Initialise the subscription timestamps
sql = "INSERT INTO subscriptionTimestamps"
+ " (contactId, sent, received, modified)"
sql = "INSERT INTO subscriptionTimes"
+ " (contactId, received, acked, expiry)"
+ " VALUES (?, ZERO(), ZERO(), ZERO())";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -732,7 +740,8 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.setBytes(1, g.getId().getBytes());
ps.setString(2, g.getName());
ps.setBytes(3, g.getPublicKey());
ps.setLong(4, System.currentTimeMillis());
long now = System.currentTimeMillis();
ps.setLong(4, now);
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
@@ -788,81 +797,57 @@ abstract class JdbcDatabase implements Database<Connection> {
// Insert the group ID into the linked list
byte[] id = g.getBytes();
String sql = "SELECT groupId, nextId, deleted FROM visibilities"
+ " WHERE contactId = ?"
+ " ORDER BY groupId";
+ " WHERE contactId = ? ORDER BY groupId";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if(rs.next()) {
// The head pointer of the list exists
byte[] groupId = rs.getBytes(1);
if(groupId != null) throw new DbStateException();
byte[] nextId = rs.getBytes(2);
long deleted = rs.getLong(3);
// Scan through the list to find the insertion point
while(nextId != null && ByteUtils.compare(id, nextId) > 0) {
if(!rs.next()) throw new DbStateException();
groupId = rs.getBytes(1);
if(groupId == null) throw new DbStateException();
nextId = rs.getBytes(2);
deleted = rs.getLong(3);
}
rs.close();
ps.close();
// Update the previous element
if(groupId == null) {
// Inserting at the head of the list
sql = "UPDATE visibilities SET nextId = ?"
+ " WHERE contactId = ? AND groupId IS NULL";
ps = txn.prepareStatement(sql);
ps.setBytes(1, id);
ps.setInt(2, c.getInt());
} else {
// Inserting in the middle or at the tail of the list
sql = "UPDATE visibilities SET nextId = ?"
+ " WHERE contactId = ? AND groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, id);
ps.setInt(2, c.getInt());
ps.setBytes(3, groupId);
}
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
// Insert the new element
sql = "INSERT INTO visibilities"
+ " (contactId, groupId, nextId, deleted)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, id);
if(nextId == null) ps.setNull(3, Types.BINARY); // At the tail
else ps.setBytes(3, nextId); // In the middle
ps.setLong(4, deleted);
affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
} else {
// The head pointer of the list does not exist
rs.close();
ps.close();
sql = "INSERT INTO visibilities (contactId, nextId, deleted)"
+ " VALUES (?, ?, ZERO())";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, id);
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
sql = "INSERT INTO visibilities (contactId, groupId, deleted)"
+ " VALUES (?, ?, ZERO())";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, id);
affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
if(!rs.next()) throw new DbStateException();
// Scan through the list to find the insertion point
byte[] groupId = rs.getBytes(1);
if(groupId != null) throw new DbStateException();
byte[] nextId = rs.getBytes(2);
long deleted = rs.getLong(3);
while(nextId != null && ByteUtils.compare(id, nextId) > 0) {
if(!rs.next()) throw new DbStateException();
groupId = rs.getBytes(1);
if(groupId == null) throw new DbStateException();
nextId = rs.getBytes(2);
deleted = rs.getLong(3);
}
rs.close();
ps.close();
// Update the previous element
if(groupId == null) {
// Inserting at the head of the list
sql = "UPDATE visibilities SET nextId = ?"
+ " WHERE contactId = ? AND groupId IS NULL";
ps = txn.prepareStatement(sql);
ps.setBytes(1, id);
ps.setInt(2, c.getInt());
} else {
// Inserting in the middle or at the tail of the list
sql = "UPDATE visibilities SET nextId = ?"
+ " WHERE contactId = ? AND groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, id);
ps.setInt(2, c.getInt());
ps.setBytes(3, groupId);
}
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
// Insert the new element
sql = "INSERT INTO visibilities"
+ " (contactId, groupId, nextId, deleted) VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, id);
if(nextId == null) ps.setNull(3, Types.BINARY); // At the tail
else ps.setBytes(3, nextId); // In the middle
ps.setLong(4, deleted);
affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
@@ -935,19 +920,17 @@ abstract class JdbcDatabase implements Database<Connection> {
public boolean containsSubscription(Connection txn, GroupId g, long time)
throws DbException {
boolean found = false;
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT start FROM subscriptions WHERE groupId = ?";
String sql = "SELECT NULL FROM subscriptions"
+ " WHERE groupId = ? AND start <= ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
ps.setLong(2, time);
rs = ps.executeQuery();
if(rs.next()) {
long start = rs.getLong(1);
if(start <= time) found = true;
if(rs.next()) throw new DbStateException();
}
boolean found = rs.next();
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
return found;
@@ -1128,6 +1111,31 @@ abstract class JdbcDatabase implements Database<Connection> {
} else return f.length();
}
public long getExpiryTime(Connection txn) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
long timestamp = 0L;
String sql = "SELECT timestamp FROM messages"
+ " ORDER BY timestamp LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, 1);
rs = ps.executeQuery();
if(rs.next()) {
timestamp = rs.getLong(1);
timestamp -= timestamp % DatabaseConstants.EXPIRY_MODULUS;
}
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
return timestamp;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public MessageId getGroupMessageParent(Connection txn, MessageId m)
throws DbException {
PreparedStatement ps = null;
@@ -1371,16 +1379,21 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.close();
if(raw != null) return raw;
// Do we have a sendable group message with the given ID?
sql = "SELECT length, raw FROM messages AS m"
sql = "SELECT length, raw FROM messages"
+ " JOIN contactSubscriptions AS cs"
+ " ON m.groupId = cs.groupId"
+ " JOIN visibilities AS v"
+ " ON m.groupId = v.groupId AND cs.contactId = v.contactId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId AND cs.contactId = s.contactId"
+ " WHERE m.messageId = ?"
+ " ON messages.groupId = cs.groupId"
+ " JOIN visibilities"
+ " ON messages.groupId = visibilities.groupId"
+ " AND cs.contactId = visibilities.contactId"
+ " JOIN statuses"
+ " ON messages.messageId = statuses.messageId"
+ " AND cs.contactId = statuses.contactId"
+ " JOIN subscriptionTimes"
+ " ON cs.contactId = subscriptionTimes.contactId"
+ " WHERE messages.messageId = ?"
+ " AND cs.contactId = ?"
+ " AND timestamp >= start"
+ " AND timestamp >= expiry"
+ " AND status = ?"
+ " AND sendability > ZERO()";
ps = txn.prepareStatement(sql);
@@ -1561,12 +1574,12 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT ct.contactId, key, value"
+ " FROM contactTransports AS ct"
String sql = "SELECT contactTransports.contactId, key, value"
+ " FROM contactTransports"
+ " LEFT OUTER JOIN contactTransportProperties AS ctp"
+ " ON ct.transportId = ctp.transportId"
+ " WHERE ct.transportId = ?"
+ " ORDER BY ct.contactId";
+ " ON contactTransports.transportId = ctp.transportId"
+ " WHERE contactTransports.transportId = ?"
+ " ORDER BY contactTransports.contactId";
ps = txn.prepareStatement(sql);
ps.setBytes(1, t.getBytes());
rs = ps.executeQuery();
@@ -1639,15 +1652,20 @@ abstract class JdbcDatabase implements Database<Connection> {
if(ids.size() == maxMessages)
return Collections.unmodifiableList(ids);
// Do we have any sendable group messages?
sql = "SELECT m.messageId FROM messages AS m"
sql = "SELECT messages.messageId FROM messages"
+ " JOIN contactSubscriptions AS cs"
+ " ON m.groupId = cs.groupId"
+ " JOIN visibilities AS v"
+ " ON m.groupId = v.groupId AND cs.contactId = v.contactId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId AND cs.contactId = s.contactId"
+ " JOIN visibilities"
+ " ON messages.groupId = visibilities.groupId"
+ " AND cs.contactId = visibilities.contactId"
+ " JOIN statuses"
+ " ON messages.messageId = statuses.messageId"
+ " AND cs.contactId = statuses.contactId"
+ " JOIN subscriptionTimes"
+ " ON cs.contactId = subscriptionTimes.contactId"
+ " WHERE cs.contactId = ?"
+ " AND timestamp >= start"
+ " AND timestamp >= expiry"
+ " AND status = ?"
+ " AND sendability > ZERO()"
+ " ORDER BY timestamp"
@@ -1694,15 +1712,20 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.close();
if(total == capacity) return Collections.unmodifiableList(ids);
// Do we have any sendable group messages?
sql = "SELECT length, m.messageId FROM messages AS m"
sql = "SELECT length, messages.messageId FROM messages"
+ " JOIN contactSubscriptions AS cs"
+ " ON m.groupId = cs.groupId"
+ " JOIN visibilities AS v"
+ " ON m.groupId = v.groupId AND cs.contactId = v.contactId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId AND cs.contactId = s.contactId"
+ " ON messages.groupId = cs.groupId"
+ " JOIN visibilities"
+ " ON messages.groupId = visibilities.groupId"
+ " AND cs.contactId = visibilities.contactId"
+ " JOIN statuses"
+ " ON messages.messageId = statuses.messageId"
+ " AND cs.contactId = statuses.contactId"
+ " JOIN subscriptionTimes"
+ " ON cs.contactId = subscriptionTimes.contactId"
+ " WHERE cs.contactId = ?"
+ " AND timestamp >= start"
+ " AND timestamp >= expiry"
+ " AND status = ?"
+ " AND sendability > ZERO()"
+ " ORDER BY timestamp";
@@ -1801,52 +1824,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public long getSubscriptionsModified(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT modified FROM subscriptionTimestamps"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if(!rs.next()) throw new DbException();
long modified = rs.getLong(1);
if(rs.next()) throw new DbException();
rs.close();
ps.close();
return modified;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public long getSubscriptionsSent(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT sent FROM subscriptionTimestamps"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if(!rs.next()) throw new DbException();
long sent = rs.getLong(1);
if(rs.next()) throw new DbException();
rs.close();
ps.close();
return sent;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public long getTransportsModified(Connection txn) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -1938,32 +1915,33 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Map<Group, Long> getVisibleSubscriptions(Connection txn, ContactId c)
throws DbException {
long expiry = getApproximateExpiryTime(txn);
public Map<GroupId, GroupId> getVisibleHoles(Connection txn, ContactId c,
long timestamp) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql =
"SELECT subscriptions.groupId, groupName, groupKey, start"
+ " FROM subscriptions JOIN visibilities"
+ " ON subscriptions.groupId = visibilities.groupId"
+ " WHERE contactId = ?";
String sql = "SELECT groupId, nextId FROM visibilities"
+ " JOIN subscriptionTimes"
+ " ON visibilities.contactId = subscriptionTimes.contactId"
+ " WHERE visibilities.contactId = ?"
+ " AND deleted > acked AND deleted < ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setLong(2, timestamp);
rs = ps.executeQuery();
Map<Group, Long> subs = new HashMap<Group, Long>();
Map<GroupId, GroupId> holes = null;
while(rs.next()) {
GroupId id = new GroupId(rs.getBytes(1));
String name = rs.getString(2);
byte[] publicKey = rs.getBytes(3);
Group g = groupFactory.createGroup(id, name, publicKey);
long start = Math.max(rs.getLong(4), expiry);
subs.put(g, start);
byte[] b = rs.getBytes(1);
GroupId groupId = b == null ? null : new GroupId(b);
b = rs.getBytes(2);
GroupId nextId = b == null ? null : new GroupId(b);
if(holes == null) holes = new HashMap<GroupId, GroupId>();
holes.put(groupId, nextId);
}
rs.close();
ps.close();
return Collections.unmodifiableMap(subs);
if(holes == null) return Collections.emptyMap();
return Collections.unmodifiableMap(holes);
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
@@ -1971,24 +1949,36 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
private long getApproximateExpiryTime(Connection txn) throws DbException {
public Map<Group, Long> getVisibleSubscriptions(Connection txn, ContactId c,
long timestamp) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
long timestamp = 0L;
String sql = "SELECT timestamp FROM messages"
+ " ORDER BY timestamp LIMIT ?";
String sql =
"SELECT subscriptions.groupId, groupName, groupKey, start"
+ " FROM subscriptions JOIN visibilities"
+ " ON subscriptions.groupId = visibilities.groupId"
+ " JOIN subscriptionTimes"
+ " ON visibilities.contactId = subscriptionTimes.contactId"
+ " WHERE visibilities.contactId = ?"
+ " AND start > acked AND start < ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, 1);
ps.setInt(1, c.getInt());
ps.setLong(2, timestamp);
rs = ps.executeQuery();
if(rs.next()) {
timestamp = rs.getLong(1);
timestamp -= timestamp % DatabaseConstants.EXPIRY_MODULUS;
Map<Group, Long> subs = null;
while(rs.next()) {
GroupId id = new GroupId(rs.getBytes(1));
String name = rs.getString(2);
byte[] publicKey = rs.getBytes(3);
long start = rs.getLong(4);
if(subs == null) subs = new HashMap<Group, Long>();
subs.put(groupFactory.createGroup(id, name, publicKey), start);
}
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
return timestamp;
if(subs == null) return Collections.emptyMap();
return Collections.unmodifiableMap(subs);
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
@@ -2017,15 +2007,20 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.close();
if(found) return true;
// Do we have any sendable group messages?
sql = "SELECT m.messageId FROM messages AS m"
sql = "SELECT messages.messageId FROM messages"
+ " JOIN contactSubscriptions AS cs"
+ " ON m.groupId = cs.groupId"
+ " JOIN visibilities AS v"
+ " ON m.groupId = v.groupId AND cs.contactId = v.contactId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId AND cs.contactId = s.contactId"
+ " ON messages.groupId = cs.groupId"
+ " JOIN visibilities"
+ " ON messages.groupId = visibilities.groupId"
+ " AND cs.contactId = visibilities.contactId"
+ " JOIN statuses"
+ " ON messages.messageId = statuses.messageId"
+ " AND cs.contactId = statuses.contactId"
+ " JOIN subscriptionTimes"
+ " ON cs.contactId = subscriptionTimes.contactId"
+ " WHERE cs.contactId = ?"
+ " AND timestamp >= start"
+ " AND timestamp >= expiry"
+ " AND status = ?"
+ " AND sendability > ZERO()"
+ " LIMIT ?";
@@ -2600,14 +2595,18 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT NULL FROM messages AS m"
String sql = "SELECT NULL FROM messages"
+ " JOIN contactSubscriptions AS cs"
+ " ON m.groupId = cs.groupId"
+ " JOIN visibilities AS v"
+ " ON m.groupId = v.groupId AND cs.contactId = v.contactId"
+ " ON messages.groupId = cs.groupId"
+ " JOIN visibilities"
+ " ON messages.groupId = visibilities.groupId"
+ " AND cs.contactId = visibilities.contactId"
+ " JOIN subscriptionTimes"
+ " ON cs.contactId = subscriptionTimes.contactId"
+ " WHERE messageId = ?"
+ " AND cs.contactId = ?"
+ " AND timestamp >= start";
+ " AND timestamp >= start"
+ " AND timestamp >= expiry";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
@@ -2640,7 +2639,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
// Return if the timestamp isn't fresh
String sql = "SELECT received FROM subscriptionTimestamps"
String sql = "SELECT received FROM subscriptionTimes"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -2679,7 +2678,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
ps.close();
// Update the timestamp
sql = "UPDATE subscriptionTimestamps SET received = ?"
sql = "UPDATE subscriptionTimes SET received = ?"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
@@ -2694,37 +2693,12 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setSubscriptionsModified(Connection txn,
Collection<ContactId> contacts, long timestamp) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE subscriptionTimestamps SET modified = ?"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
for(ContactId c : contacts) {
ps.setInt(2, c.getInt());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if(batchAffected.length != contacts.size())
throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] > 1) throw new DbStateException();
}
ps.close();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setSubscriptionsSent(Connection txn, ContactId c,
public void setSubscriptionsAcked(Connection txn, ContactId c,
long timestamp) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE subscriptionTimestamps SET sent = ?"
+ " WHERE contactId = ? AND sent < ?";
String sql = "UPDATE subscriptionTimes SET acked = ?"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
ps.setInt(2, c.getInt());

View File

@@ -9,6 +9,7 @@ import net.sf.briar.api.crypto.MessageDigest;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.PacketFactory;
@@ -47,9 +48,10 @@ class PacketFactoryImpl implements PacketFactory {
return new RequestImpl(requested, length);
}
public SubscriptionUpdate createSubscriptionUpdate(Map<Group, Long> subs,
public SubscriptionUpdate createSubscriptionUpdate(
Map<GroupId, GroupId> holes, Map<Group, Long> subs, long expiry,
long timestamp) {
return new SubscriptionUpdateImpl(subs, timestamp);
return new SubscriptionUpdateImpl(holes, subs, expiry, timestamp);
}
public TransportUpdate createTransportUpdate(

View File

@@ -10,6 +10,7 @@ import java.util.Map.Entry;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolWriter;
@@ -112,12 +113,23 @@ class ProtocolWriterImpl implements ProtocolWriter {
public void writeSubscriptionUpdate(SubscriptionUpdate s)
throws IOException {
w.writeStructId(Types.SUBSCRIPTION_UPDATE);
// Holes
w.writeMapStart();
for(Entry<GroupId, GroupId> e : s.getHoles().entrySet()) {
w.writeBytes(e.getKey().getBytes());
w.writeBytes(e.getValue().getBytes());
}
w.writeMapEnd();
// Subscriptions
w.writeMapStart();
for(Entry<Group, Long> e : s.getSubscriptions().entrySet()) {
writeGroup(w, e.getKey());
w.writeInt64(e.getValue());
}
w.writeMapEnd();
// Expiry time
w.writeInt64(s.getExpiryTime());
// Timestamp
w.writeInt64(s.getTimestamp());
if(flush) out.flush();
}

View File

@@ -3,22 +3,35 @@ package net.sf.briar.protocol;
import java.util.Map;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.SubscriptionUpdate;
class SubscriptionUpdateImpl implements SubscriptionUpdate {
private final Map<GroupId, GroupId> holes;
private final Map<Group, Long> subs;
private final long timestamp;
private final long expiry, timestamp;
SubscriptionUpdateImpl(Map<Group, Long> subs, long timestamp) {
SubscriptionUpdateImpl(Map<GroupId, GroupId> holes, Map<Group, Long> subs,
long expiry, long timestamp) {
this.holes = holes;
this.subs = subs;
this.expiry = expiry;
this.timestamp = timestamp;
}
public Map<GroupId, GroupId> getHoles() {
return holes;
}
public Map<Group, Long> getSubscriptions() {
return subs;
}
public long getExpiryTime() {
return expiry;
}
public long getTimestamp() {
return timestamp;
}

View File

@@ -3,17 +3,20 @@ package net.sf.briar.protocol;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.Types;
import net.sf.briar.api.protocol.UniqueId;
import net.sf.briar.api.serial.Consumer;
import net.sf.briar.api.serial.CountingConsumer;
import net.sf.briar.api.serial.StructReader;
import net.sf.briar.api.serial.Reader;
import net.sf.briar.api.serial.StructReader;
class SubscriptionUpdateReader implements StructReader<SubscriptionUpdate> {
@@ -32,13 +35,32 @@ class SubscriptionUpdateReader implements StructReader<SubscriptionUpdate> {
// Read the data
r.addConsumer(counting);
r.readStructId(Types.SUBSCRIPTION_UPDATE);
// Holes
Map<GroupId, GroupId> holes = new HashMap<GroupId, GroupId>();
r.setMaxBytesLength(UniqueId.LENGTH);
r.readMapStart();
while(!r.hasMapEnd()) {
byte[] start = r.readBytes();
if(start.length != UniqueId.LENGTH) throw new FormatException();
byte[] end = r.readBytes();
if(end.length != UniqueId.LENGTH)throw new FormatException();
holes.put(new GroupId(start), new GroupId(end));
}
r.readMapEnd();
r.resetMaxBytesLength();
// Subscriptions
r.addStructReader(Types.GROUP, groupReader);
Map<Group, Long> subs = r.readMap(Group.class, Long.class);
r.removeStructReader(Types.GROUP);
// Expiry time
long expiry = r.readInt64();
if(expiry < 0L) throw new FormatException();
// Timestamp
long timestamp = r.readInt64();
if(timestamp < 0L) throw new FormatException();
r.removeConsumer(counting);
// Build and return the subscription update
return packetFactory.createSubscriptionUpdate(subs, timestamp);
return packetFactory.createSubscriptionUpdate(holes, subs, expiry,
timestamp);
}
}

View File

@@ -25,6 +25,7 @@ import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupFactory;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageFactory;
import net.sf.briar.api.protocol.MessageId;
@@ -172,8 +173,8 @@ public class ProtocolIntegrationTest extends BriarTestCase {
Map<Group, Long> subs = new LinkedHashMap<Group, Long>();
subs.put(group, 0L);
subs.put(group1, 0L);
SubscriptionUpdate s = packetFactory.createSubscriptionUpdate(subs,
timestamp);
SubscriptionUpdate s = packetFactory.createSubscriptionUpdate(
Collections.<GroupId, GroupId>emptyMap(), subs, 0L, timestamp);
writer.writeSubscriptionUpdate(s);
TransportUpdate t = packetFactory.createTransportUpdate(transports,

View File

@@ -802,35 +802,6 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
context.assertIsSatisfied();
}
@Test
public void testSubscriptionUpdateNotSentUnlessDue() throws Exception {
final long now = System.currentTimeMillis();
Mockery context = new Mockery();
@SuppressWarnings("unchecked")
final Database<Object> database = context.mock(Database.class);
final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class);
final ShutdownManager shutdown = context.mock(ShutdownManager.class);
final PacketFactory packetFactory = context.mock(PacketFactory.class);
context.checking(new Expectations() {{
allowing(database).startTransaction();
will(returnValue(txn));
allowing(database).commitTransaction(txn);
allowing(database).containsContact(txn, contactId);
will(returnValue(true));
// Check whether an update is due
oneOf(database).getSubscriptionsModified(txn, contactId);
will(returnValue(now - 1L));
oneOf(database).getSubscriptionsSent(txn, contactId);
will(returnValue(now));
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner,
shutdown, packetFactory);
assertNull(db.generateSubscriptionUpdate(contactId));
context.assertIsSatisfied();
}
@Test
public void testGenerateSubscriptionUpdate() throws Exception {
Mockery context = new Mockery();
@@ -847,19 +818,21 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
allowing(database).commitTransaction(txn);
allowing(database).containsContact(txn, contactId);
will(returnValue(true));
// Check whether an update is due
oneOf(database).getSubscriptionsModified(txn, contactId);
will(returnValue(0L));
oneOf(database).getSubscriptionsSent(txn, contactId);
will(returnValue(0L));
// Get the visible subscriptions
oneOf(database).getVisibleSubscriptions(txn, contactId);
will(returnValue(Collections.singletonMap(group, 0L)));
oneOf(database).setSubscriptionsSent(with(txn), with(contactId),
// Get the visible holes and subscriptions
oneOf(database).getVisibleHoles(with(txn), with(contactId),
with(any(long.class)));
will(returnValue(Collections.<GroupId, GroupId>emptyMap()));
oneOf(database).getVisibleSubscriptions(with(txn), with(contactId),
with(any(long.class)));
will(returnValue(Collections.singletonMap(group, 0L)));
// Get the expiry time
oneOf(database).getExpiryTime(txn);
will(returnValue(0L));
// Create the packet
oneOf(packetFactory).createSubscriptionUpdate(
with(Collections.<GroupId, GroupId>emptyMap()),
with(Collections.singletonMap(group, 0L)),
with(any(long.class)),
with(any(long.class)));
will(returnValue(subscriptionUpdate));
}});
@@ -1557,9 +1530,6 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
oneOf(database).getContacts(txn);
will(returnValue(both));
oneOf(database).removeVisibility(txn, contactId1, groupId);
oneOf(database).setSubscriptionsModified(with(txn),
with(Collections.singletonList(contactId1)),
with(any(long.class)));
oneOf(database).commitTransaction(txn);
oneOf(listener).eventOccurred(with(any(
SubscriptionsUpdatedEvent.class)));

View File

@@ -1628,37 +1628,6 @@ public class H2DatabaseTest extends BriarTestCase {
db.close();
}
@Test
public void testTimestamps() throws Exception {
Database<Connection> db = open(false);
Connection txn = db.startTransaction();
// Add a contact
assertEquals(contactId, db.addContact(txn, inSecret, outSecret, erase));
// The subscription and transport timestamps should be initialised to 0
assertEquals(0L, db.getSubscriptionsModified(txn, contactId));
assertEquals(0L, db.getSubscriptionsSent(txn, contactId));
assertEquals(0L, db.getTransportsModified(txn));
assertEquals(0L, db.getTransportsSent(txn, contactId));
// Update the timestamps
db.setSubscriptionsModified(txn,
Collections.singletonList(contactId), 1L);
db.setSubscriptionsSent(txn, contactId, 2L);
db.setTransportsModified(txn, 3L);
db.setTransportsSent(txn, contactId, 4L);
// Check that the updated values were stored
assertEquals(1L, db.getSubscriptionsModified(txn, contactId));
assertEquals(2L, db.getSubscriptionsSent(txn, contactId));
assertEquals(3L, db.getTransportsModified(txn));
assertEquals(4L, db.getTransportsSent(txn, contactId));
db.commitTransaction(txn);
db.close();
}
@Test
public void testGetMessageBody() throws Exception {
Database<Connection> db = open(false);

View File

@@ -2,7 +2,6 @@ package net.sf.briar.protocol;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_AUTHOR_NAME_LENGTH;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_BODY_LENGTH;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_GROUPS;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_GROUP_NAME_LENGTH;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PROPERTIES_PER_TRANSPORT;
@@ -16,8 +15,6 @@ import java.security.PrivateKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import net.sf.briar.BriarTestCase;
import net.sf.briar.TestUtils;
@@ -36,7 +33,6 @@ import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.ProtocolWriter;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.RawBatch;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.Transport;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportIndex;
@@ -155,30 +151,6 @@ public class ConstantsTest extends BriarTestCase {
assertTrue(out.size() <= length);
}
@Test
public void testSubscriptionsFitIntoUpdate() throws Exception {
// Create the maximum number of maximum-length subscriptions
Map<Group, Long> subs = new HashMap<Group, Long>(MAX_GROUPS);
byte[] publicKey = new byte[MAX_PUBLIC_KEY_LENGTH];
for(int i = 0; i < MAX_GROUPS; i++) {
String name = createRandomString(MAX_GROUP_NAME_LENGTH);
Group group = groupFactory.createGroup(name, publicKey);
subs.put(group, Long.MAX_VALUE);
}
// Add the subscriptions to an update
ByteArrayOutputStream out =
new ByteArrayOutputStream(MAX_PACKET_LENGTH);
ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out,
true);
SubscriptionUpdate s = packetFactory.createSubscriptionUpdate(subs,
Long.MAX_VALUE);
writer.writeSubscriptionUpdate(s);
// Check the size of the serialised update
assertTrue(out.size() > MAX_GROUPS *
(MAX_GROUP_NAME_LENGTH + MAX_PUBLIC_KEY_LENGTH + 8) + 8);
assertTrue(out.size() <= MAX_PACKET_LENGTH);
}
@Test
public void testTransportsFitIntoUpdate() throws Exception {
// Create the maximum number of plugins, each with the maximum number

View File

@@ -14,6 +14,7 @@ import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupFactory;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageFactory;
import net.sf.briar.api.protocol.Offer;
@@ -97,7 +98,8 @@ public class ProtocolReadWriteTest extends BriarTestCase {
writer.writeRequest(r);
SubscriptionUpdate s = packetFactory.createSubscriptionUpdate(
subscriptions, timestamp);
Collections.<GroupId, GroupId>emptyMap(), subscriptions, 0L,
timestamp);
writer.writeSubscriptionUpdate(s);
TransportUpdate t = packetFactory.createTransportUpdate(transports,

View File

@@ -114,8 +114,8 @@ public class SimplexConnectionReadWriteTest extends BriarTestCase {
alice.getInstance(ConnectionWriterFactory.class);
ProtocolWriterFactory protoFactory =
alice.getInstance(ProtocolWriterFactory.class);
TestSimplexTransportWriter transport = new TestSimplexTransportWriter(out,
Long.MAX_VALUE, false);
TestSimplexTransportWriter transport = new TestSimplexTransportWriter(
out, Long.MAX_VALUE, false);
OutgoingSimplexConnection simplex = new OutgoingSimplexConnection(db,
connRegistry, connFactory, protoFactory, contactId, transportId,
transportIndex, transport);