Database methods for expiry updates and expiry acks.

This commit is contained in:
akwizgran
2013-01-28 23:12:42 +00:00
parent d0904d8f1b
commit 2c27a0251a
9 changed files with 363 additions and 93 deletions

View File

@@ -11,6 +11,8 @@ import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.db.event.DatabaseListener; import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.ExpiryAck;
import net.sf.briar.api.protocol.ExpiryUpdate;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -96,6 +98,18 @@ public interface DatabaseComponent {
Collection<byte[]> generateBatch(ContactId c, int maxLength, Collection<byte[]> generateBatch(ContactId c, int maxLength,
Collection<MessageId> requested) throws DbException; Collection<MessageId> requested) throws DbException;
/**
* Generates an expiry ack for the given contact. Returns null if no ack
* is due.
*/
ExpiryAck generateExpiryAck(ContactId c) throws DbException;
/**
* Generates an expiry update for the given contact. Returns null if no
* update is due.
*/
ExpiryUpdate generateExpiryUpdate(ContactId c) throws DbException;
/** /**
* Generates an offer for the given contact. Returns null if there are no * Generates an offer for the given contact. Returns null if there are no
* messages to offer. * messages to offer.
@@ -186,6 +200,12 @@ public interface DatabaseComponent {
/** Processes an ack from the given contact. */ /** Processes an ack from the given contact. */
void receiveAck(ContactId c, Ack a) throws DbException; void receiveAck(ContactId c, Ack a) throws DbException;
/** Processes an expiry ack from the given contact. */
void receiveExpiryAck(ContactId c, ExpiryAck a) throws DbException;
/** Processes an expiry update from the given contact. */
void receiveExpiryUpdate(ContactId c, ExpiryUpdate u) throws DbException;
/** Processes a message from the given contact. */ /** Processes a message from the given contact. */
void receiveMessage(ContactId c, Message m) throws DbException; void receiveMessage(ContactId c, Message m) throws DbException;
@@ -204,14 +224,14 @@ public interface DatabaseComponent {
throws DbException; throws DbException;
/** Processes a subscription update from the given contact. */ /** Processes a subscription update from the given contact. */
void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate s) void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate u)
throws DbException; throws DbException;
/** Processes a transport ack from the given contact. */ /** Processes a transport ack from the given contact. */
void receiveTransportAck(ContactId c, TransportAck a) throws DbException; void receiveTransportAck(ContactId c, TransportAck a) throws DbException;
/** Processes a transport update from the given contact. */ /** Processes a transport update from the given contact. */
void receiveTransportUpdate(ContactId c, TransportUpdate t) void receiveTransportUpdate(ContactId c, TransportUpdate u)
throws DbException; throws DbException;
/** Removes a contact (and all associated state) from the database. */ /** Removes a contact (and all associated state) from the database. */

View File

@@ -22,11 +22,11 @@ public interface ProtocolWriter {
void writeSubscriptionAck(SubscriptionAck a) throws IOException; void writeSubscriptionAck(SubscriptionAck a) throws IOException;
void writeSubscriptionUpdate(SubscriptionUpdate s) throws IOException; void writeSubscriptionUpdate(SubscriptionUpdate u) throws IOException;
void writeTransportAck(TransportAck a) throws IOException; void writeTransportAck(TransportAck a) throws IOException;
void writeTransportUpdate(TransportUpdate t) throws IOException; void writeTransportUpdate(TransportUpdate u) throws IOException;
void flush() throws IOException; void flush() throws IOException;

View File

@@ -11,6 +11,8 @@ import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageHeader; import net.sf.briar.api.db.MessageHeader;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.ExpiryAck;
import net.sf.briar.api.protocol.ExpiryUpdate;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -34,6 +36,7 @@ import net.sf.briar.api.transport.TemporarySecret;
* deadlock, locks must be acquired in the following (alphabetical) order: * deadlock, locks must be acquired in the following (alphabetical) order:
* <ul> * <ul>
* <li> contact * <li> contact
* <li> expiry
* <li> message * <li> message
* <li> rating * <li> rating
* <li> subscription * <li> subscription
@@ -206,11 +209,19 @@ interface Database<T> {
Collection<ContactTransport> getContactTransports(T txn) throws DbException; Collection<ContactTransport> getContactTransports(T txn) throws DbException;
/** /**
* Returns the approximate expiry time of the database. * Returns an expiry ack for the given contact, or null if no ack is due.
* <p> * <p>
* Locking: message read. * Locking: contact read, expiry write.
*/ */
long getExpiryTime(T txn) throws DbException; ExpiryAck getExpiryAck(T txn, ContactId c) throws DbException;
/**
* Returns an expiry update for the given contact, or null if no update is
* due.
* <p>
* Locking: contact read, expiry write.
*/
ExpiryUpdate getExpiryUpdate(T txn, ContactId c) throws DbException;
/** /**
* Returns the amount of free storage space available to the database, in * Returns the amount of free storage space available to the database, in
@@ -447,6 +458,14 @@ interface Database<T> {
long incrementConnectionCounter(T txn, ContactId c, TransportId t, long incrementConnectionCounter(T txn, ContactId c, TransportId t,
long period) throws DbException; long period) throws DbException;
/**
* Increments the expiry versions for all contacts to indicate that the
* database's expiry time has changed and expiry updates should be sent.
* <p>
* Locking: contact read, expiry write.
*/
void incrementExpiryVersions(T txn) throws DbException;
/** /**
* Merges the given configuration with the existing configuration for the * Merges the given configuration with the existing configuration for the
* given transport. * given transport.
@@ -531,11 +550,14 @@ interface Database<T> {
long centre, byte[] bitmap) throws DbException; long centre, byte[] bitmap) throws DbException;
/** /**
* Sets the given contact's database expiry time. * Sets the expiry time of the given contact's database, unless an update
* with an equal or higher version number has already been received from
* the contact.
* <p> * <p>
* Locking: contact write. * Locking: contact read, expiry write.
*/ */
void setExpiryTime(T txn, ContactId c, long expiry) throws DbException; void setExpiryTime(T txn, ContactId c, long expiry, long version)
throws DbException;
/** /**
* Sets the user's rating for the given author. * Sets the user's rating for the given author.
@@ -560,7 +582,7 @@ interface Database<T> {
* <p> * <p>
* Locking: contact read, transport write. * Locking: contact read, transport write.
*/ */
void setRemoteProperties(T txn, ContactId c, TransportUpdate t) void setRemoteProperties(T txn, ContactId c, TransportUpdate u)
throws DbException; throws DbException;
/** /**
@@ -604,7 +626,16 @@ interface Database<T> {
* <p> * <p>
* Locking: contact read, subscription write. * Locking: contact read, subscription write.
*/ */
void setSubscriptions(T txn, ContactId c, SubscriptionUpdate s) void setSubscriptions(T txn, ContactId c, SubscriptionUpdate u)
throws DbException;
/**
* Records an expiry ack from the given contact for the given version
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: contact read, expiry write.
*/
void setExpiryUpdateAcked(T txn, ContactId c, long version)
throws DbException; throws DbException;
/** /**

View File

@@ -46,6 +46,8 @@ import net.sf.briar.api.db.event.TransportRemovedEvent;
import net.sf.briar.api.lifecycle.ShutdownManager; import net.sf.briar.api.lifecycle.ShutdownManager;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.ExpiryAck;
import net.sf.briar.api.protocol.ExpiryUpdate;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -81,6 +83,8 @@ DatabaseCleaner.Callback {
private final ReentrantReadWriteLock contactLock = private final ReentrantReadWriteLock contactLock =
new ReentrantReadWriteLock(true); new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock expiryLock =
new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock messageLock = private final ReentrantReadWriteLock messageLock =
new ReentrantReadWriteLock(true); new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock ratingLock = private final ReentrantReadWriteLock ratingLock =
@@ -590,6 +594,54 @@ DatabaseCleaner.Callback {
return Collections.unmodifiableList(messages); return Collections.unmodifiableList(messages);
} }
public ExpiryAck generateExpiryAck(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
expiryLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
ExpiryAck a = db.getExpiryAck(txn, c);
db.commitTransaction(txn);
return a;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
expiryLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public ExpiryUpdate generateExpiryUpdate(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
expiryLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
ExpiryUpdate e = db.getExpiryUpdate(txn, c);
db.commitTransaction(txn);
return e;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
expiryLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public Offer generateOffer(ContactId c, int maxMessages) public Offer generateOffer(ContactId c, int maxMessages)
throws DbException { throws DbException {
Collection<MessageId> offered; Collection<MessageId> offered;
@@ -651,9 +703,9 @@ DatabaseCleaner.Callback {
try { try {
if(!db.containsContact(txn, c)) if(!db.containsContact(txn, c))
throw new NoSuchContactException(); throw new NoSuchContactException();
SubscriptionUpdate s = db.getSubscriptionUpdate(txn, c); SubscriptionUpdate u = db.getSubscriptionUpdate(txn, c);
db.commitTransaction(txn); db.commitTransaction(txn);
return s; return u;
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
throw e; throw e;
@@ -1038,6 +1090,54 @@ DatabaseCleaner.Callback {
} }
} }
public void receiveExpiryAck(ContactId c, ExpiryAck a) throws DbException {
contactLock.readLock().lock();
try {
expiryLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setExpiryUpdateAcked(txn, c, a.getVersionNumber());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
expiryLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveExpiryUpdate(ContactId c, ExpiryUpdate u)
throws DbException {
contactLock.readLock().lock();
try {
expiryLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setExpiryTime(txn, c, u.getExpiryTime(),
u.getVersionNumber());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
expiryLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveMessage(ContactId c, Message m) throws DbException { public void receiveMessage(ContactId c, Message m) throws DbException {
boolean added = false; boolean added = false;
contactLock.readLock().lock(); contactLock.readLock().lock();
@@ -1149,7 +1249,7 @@ DatabaseCleaner.Callback {
} }
} }
public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate s) public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate u)
throws DbException { throws DbException {
contactLock.readLock().lock(); contactLock.readLock().lock();
try { try {
@@ -1159,7 +1259,7 @@ DatabaseCleaner.Callback {
try { try {
if(!db.containsContact(txn, c)) if(!db.containsContact(txn, c))
throw new NoSuchContactException(); throw new NoSuchContactException();
db.setSubscriptions(txn, c, s); db.setSubscriptions(txn, c, u);
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
@@ -1200,7 +1300,7 @@ DatabaseCleaner.Callback {
} }
} }
public void receiveTransportUpdate(ContactId c, TransportUpdate t) public void receiveTransportUpdate(ContactId c, TransportUpdate u)
throws DbException { throws DbException {
contactLock.readLock().lock(); contactLock.readLock().lock();
try { try {
@@ -1210,7 +1310,7 @@ DatabaseCleaner.Callback {
try { try {
if(!db.containsContact(txn, c)) if(!db.containsContact(txn, c))
throw new NoSuchContactException(); throw new NoSuchContactException();
db.setRemoteProperties(txn, c, t); db.setRemoteProperties(txn, c, u);
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
@@ -1223,7 +1323,7 @@ DatabaseCleaner.Callback {
contactLock.readLock().unlock(); contactLock.readLock().unlock();
} }
// Call the listeners outside the lock // Call the listeners outside the lock
callListeners(new RemoteTransportsUpdatedEvent(c, t.getId())); callListeners(new RemoteTransportsUpdatedEvent(c, u.getId()));
} }
public void removeContact(ContactId c) throws DbException { public void removeContact(ContactId c) throws DbException {
@@ -1502,27 +1602,37 @@ DatabaseCleaner.Callback {
* removed. * removed.
*/ */
private boolean expireMessages(int size) throws DbException { private boolean expireMessages(int size) throws DbException {
Collection<MessageId> old; boolean removed = false;
contactLock.readLock().lock(); contactLock.readLock().lock();
try { try {
messageLock.writeLock().lock(); expiryLock.writeLock().lock();
try { try {
T txn = db.startTransaction(); messageLock.writeLock().lock();
try { try {
old = db.getOldMessages(txn, size); T txn = db.startTransaction();
for(MessageId m : old) removeMessage(txn, m); try {
db.commitTransaction(txn); Collection<MessageId> old =
} catch(DbException e) { db.getOldMessages(txn, size);
db.abortTransaction(txn); if(!old.isEmpty()) {
throw e; for(MessageId m : old) removeMessage(txn, m);
db.incrementExpiryVersions(txn);
removed = true;
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageLock.writeLock().unlock();
} }
} finally { } finally {
messageLock.writeLock().unlock(); expiryLock.writeLock().unlock();
} }
} finally { } finally {
contactLock.readLock().unlock(); contactLock.readLock().unlock();
} }
return old.isEmpty(); return removed;
} }
/** /**

View File

@@ -31,6 +31,8 @@ import net.sf.briar.api.db.DbClosedException;
import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageHeader; import net.sf.briar.api.db.MessageHeader;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.ExpiryAck;
import net.sf.briar.api.protocol.ExpiryUpdate;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -54,9 +56,22 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String CREATE_CONTACTS = private static final String CREATE_CONTACTS =
"CREATE TABLE contacts" "CREATE TABLE contacts"
+ " (contactId COUNTER," + " (contactId COUNTER,"
+ " expiry BIGINT NOT NULL DEFAULT 0," // FIXME: Move this
+ " PRIMARY KEY (contactId))"; + " PRIMARY KEY (contactId))";
// Locking: expiry
private static final String CREATE_EXPIRY_VERSIONS =
"CREATE TABLE expiryVersions"
+ " (contactId INT NOT NULL,"
+ " expiry BIGINT NOT NULL,"
+ " localVersion BIGINT NOT NULL,"
+ " localAcked BIGINT NOT NULL,"
+ " remoteVersion BIGINT NOT NULL,"
+ " remoteAcked BOOLEAN NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: message // Locking: message
private static final String CREATE_MESSAGES = private static final String CREATE_MESSAGES =
"CREATE TABLE messages" "CREATE TABLE messages"
@@ -340,6 +355,7 @@ abstract class JdbcDatabase implements Database<Connection> {
try { try {
s = txn.createStatement(); s = txn.createStatement();
s.executeUpdate(insertTypeNames(CREATE_CONTACTS)); s.executeUpdate(insertTypeNames(CREATE_CONTACTS));
s.executeUpdate(insertTypeNames(CREATE_EXPIRY_VERSIONS));
s.executeUpdate(insertTypeNames(CREATE_MESSAGES)); s.executeUpdate(insertTypeNames(CREATE_MESSAGES));
s.executeUpdate(INDEX_MESSAGES_BY_PARENT); s.executeUpdate(INDEX_MESSAGES_BY_PARENT);
s.executeUpdate(INDEX_MESSAGES_BY_AUTHOR); s.executeUpdate(INDEX_MESSAGES_BY_AUTHOR);
@@ -497,6 +513,16 @@ abstract class JdbcDatabase implements Database<Connection> {
if(rs.next()) throw new DbStateException(); if(rs.next()) throw new DbStateException();
rs.close(); rs.close();
ps.close(); ps.close();
// Create an expiry version row
sql = "INSERT INTO expiryVersions (contactId, expiry,"
+ " localVersion, localAcked, remoteVersion, remoteAcked)"
+ " VALUES (?, ZERO(), ?, ZERO(), ZERO(), TRUE)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, 1);
affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
// Create a group version row // Create a group version row
sql = "INSERT INTO groupVersions (contactId, localVersion," sql = "INSERT INTO groupVersions (contactId, localVersion,"
+ " localAcked, remoteVersion, remoteAcked)" + " localAcked, remoteVersion, remoteAcked)"
@@ -1016,27 +1042,68 @@ abstract class JdbcDatabase implements Database<Connection> {
} else return f.length(); } else return f.length();
} }
public long getExpiryTime(Connection txn) throws DbException { public ExpiryAck getExpiryAck(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
long timestamp = 0L; String sql = "SELECT remoteVersion FROM expiryVersions"
String sql = "SELECT timestamp FROM messages" + " WHERE contactId = ? AND remoteAcked = FALSE";
+ " ORDER BY timestamp LIMIT ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setInt(1, 1); ps.setInt(1, c.getInt());
rs = ps.executeQuery(); rs = ps.executeQuery();
if(rs.next()) { if(!rs.next()) {
timestamp = rs.getLong(1); rs.close();
timestamp -= timestamp % EXPIRY_MODULUS; ps.close();
return null;
} }
long version = rs.getLong(1);
if(rs.next()) throw new DbStateException(); if(rs.next()) throw new DbStateException();
rs.close(); rs.close();
ps.close(); ps.close();
return timestamp; sql = "UPDATE expiryVersions SET remoteAcked = TRUE"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
return new ExpiryAck(version);
} catch(SQLException e) { } catch(SQLException e) {
tryToClose(rs);
tryToClose(ps); tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public ExpiryUpdate getExpiryUpdate(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT timestamp, localVersion"
+ " FROM messages JOIN expiryVersions"
+ " WHERE contactId = ? AND localVersion > localAcked"
+ " ORDER BY timestamp LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, 1);
rs = ps.executeQuery();
if(!rs.next()) {
rs.close();
ps.close();
return null;
}
long expiry = rs.getLong(1);
expiry -= expiry % EXPIRY_MODULUS;
long version = rs.getLong(2);
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
return new ExpiryUpdate(expiry, version);
} catch(SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e); throw new DbException(e);
} }
} }
@@ -1210,8 +1277,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN contacts AS c" + " JOIN expiryVersions AS ev"
+ " ON cg.contactId = c.contactId" + " ON cg.contactId = ev.contactId"
+ " JOIN statuses AS s" + " JOIN statuses AS s"
+ " ON m.messageId = s.messageId" + " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId" + " AND cg.contactId = s.contactId"
@@ -1316,8 +1383,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN contacts AS c" + " JOIN expiryVersions AS ev"
+ " ON cg.contactId = c.contactId" + " ON cg.contactId = ev.contactId"
+ " JOIN statuses AS s" + " JOIN statuses AS s"
+ " ON m.messageId = s.messageId" + " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId" + " AND cg.contactId = s.contactId"
@@ -1577,8 +1644,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN contacts AS c" + " JOIN expiryVersions AS ev"
+ " ON cg.contactId = c.contactId" + " ON cg.contactId = ev.contactId"
+ " JOIN statuses AS s" + " JOIN statuses AS s"
+ " ON m.messageId = s.messageId" + " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId" + " AND cg.contactId = s.contactId"
@@ -1721,7 +1788,7 @@ abstract class JdbcDatabase implements Database<Connection> {
try { try {
String sql = "SELECT g.groupId, name, key, localVersion" String sql = "SELECT g.groupId, name, key, localVersion"
+ " FROM groups AS g" + " FROM groups AS g"
+ " JOIN groupVisibilities as gv" + " JOIN groupVisibilities AS gv"
+ " ON g.groupId = gv.groupId" + " ON g.groupId = gv.groupId"
+ " JOIN groupVersions AS v" + " JOIN groupVersions AS v"
+ " ON gv.contactId = v.contactId" + " ON gv.contactId = v.contactId"
@@ -1800,7 +1867,7 @@ abstract class JdbcDatabase implements Database<Connection> {
try { try {
String sql = "SELECT transportId, key, value, localVersion" String sql = "SELECT transportId, key, value, localVersion"
+ " FROM transportProperties AS tp" + " FROM transportProperties AS tp"
+ " JOIN transportVersions as tv" + " JOIN transportVersions AS tv"
+ " ON tp.transportId = tv.transportId" + " ON tp.transportId = tv.transportId"
+ " WHERE tv.contactId = ?" + " WHERE tv.contactId = ?"
+ " AND localVersion > localAcked"; + " AND localVersion > localAcked";
@@ -1909,8 +1976,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN contacts AS c" + " JOIN expiryVersions AS ev"
+ " ON cg.contactId = c.contactId" + " ON cg.contactId = ev.contactId"
+ " JOIN statuses AS s" + " JOIN statuses AS s"
+ " ON m.messageId = s.messageId" + " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId" + " AND cg.contactId = s.contactId"
@@ -1977,6 +2044,20 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public void incrementExpiryVersions(Connection txn) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE expiryVersions"
+ " SET localVersion = localVersion + ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, 1);
ps.executeUpdate();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void removeOutstandingMessages(Connection txn, ContactId c, public void removeOutstandingMessages(Connection txn, ContactId c,
Collection<MessageId> acked) throws DbException { Collection<MessageId> acked) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
@@ -2244,15 +2325,39 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public void setExpiryTime(Connection txn, ContactId c, long expiry) public void setExpiryTime(Connection txn, ContactId c, long expiry,
long version) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE expiryVersions"
+ " SET expiry = ?, remoteVersion = ?, remoteAcked = FALSE"
+ " WHERE contactId = ? AND remoteVersion < ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, expiry);
ps.setLong(2, version);
ps.setInt(3, c.getInt());
ps.setLong(4, version);
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setExpiryUpdateAcked(Connection txn, ContactId c, long version)
throws DbException { throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
try { try {
String sql = "UPDATE contacts SET expiry = ?" String sql = "UPDATE expiryVersions SET localAcked = ?"
+ " WHERE contactId = ?"; + " WHERE contactId = ?"
+ " AND localAcked < ? AND localVersion >= ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setLong(1, expiry); ps.setLong(1, version);
ps.setInt(2, c.getInt()); ps.setInt(2, c.getInt());
ps.setLong(3, version);
ps.setLong(4, version);
int affected = ps.executeUpdate(); int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException(); if(affected > 1) throw new DbStateException();
ps.close(); ps.close();
@@ -2362,16 +2467,17 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
public void setRemoteProperties(Connection txn, ContactId c, public void setRemoteProperties(Connection txn, ContactId c,
TransportUpdate t) throws DbException { TransportUpdate u) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
TransportId t = u.getId();
// Find the existing version, if any // Find the existing version, if any
String sql = "SELECT remoteVersion FROM contactTransportVersions" String sql = "SELECT remoteVersion FROM contactTransportVersions"
+ " WHERE contactId = ? AND transportId = ?"; + " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt()); ps.setInt(1, c.getInt());
ps.setBytes(2, t.getId().getBytes()); ps.setBytes(2, t.getBytes());
rs = ps.executeQuery(); rs = ps.executeQuery();
long version = rs.next() ? rs.getLong(1) : -1L; long version = rs.next() ? rs.getLong(1) : -1L;
if(rs.next()) throw new DbStateException(); if(rs.next()) throw new DbStateException();
@@ -2385,8 +2491,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " VALUES (?, ?, ?, FALSE)"; + " VALUES (?, ?, ?, FALSE)";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt()); ps.setInt(1, c.getInt());
ps.setBytes(2, t.getId().getBytes()); ps.setBytes(2, t.getBytes());
ps.setLong(3, t.getVersionNumber()); ps.setLong(3, u.getVersionNumber());
int affected = ps.executeUpdate(); int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException(); if(affected != 1) throw new DbStateException();
ps.close(); ps.close();
@@ -2396,32 +2502,32 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " SET remoteVersion = ?, remoteAcked = FALSE" + " SET remoteVersion = ?, remoteAcked = FALSE"
+ " WHERE contactId = ? AND transportId = ?"; + " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setLong(1, Math.max(version, t.getVersionNumber())); ps.setLong(1, Math.max(version, u.getVersionNumber()));
ps.setInt(1, c.getInt()); ps.setInt(1, c.getInt());
ps.setBytes(2, t.getId().getBytes()); ps.setBytes(2, t.getBytes());
int affected = ps.executeUpdate(); int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException(); if(affected > 1) throw new DbStateException();
ps.close(); ps.close();
// Return if the update is obsolete // Return if the update is obsolete
if(t.getVersionNumber() <= version) return; if(u.getVersionNumber() <= version) return;
} }
// Delete the existing properties, if any // Delete the existing properties, if any
sql = "DELETE FROM contactTransportProperties" sql = "DELETE FROM contactTransportProperties"
+ " WHERE contactId = ? AND transportId = ?"; + " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt()); ps.setInt(1, c.getInt());
ps.setBytes(2, t.getId().getBytes()); ps.setBytes(2, t.getBytes());
ps.executeUpdate(); ps.executeUpdate();
ps.close(); ps.close();
// Store the new properties, if any // Store the new properties, if any
TransportProperties p = t.getProperties(); TransportProperties p = u.getProperties();
if(p.isEmpty()) return; if(p.isEmpty()) return;
sql = "INSERT INTO contactTransportProperties" sql = "INSERT INTO contactTransportProperties"
+ " (contactId, transportId, key, value)" + " (contactId, transportId, key, value)"
+ " VALUES (?, ?, ?, ?)"; + " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt()); ps.setInt(1, c.getInt());
ps.setBytes(2, t.getId().getBytes()); ps.setBytes(2, t.getBytes());
for(Entry<String, String> e : p.entrySet()) { for(Entry<String, String> e : p.entrySet()) {
ps.setString(1, e.getKey()); ps.setString(1, e.getKey());
ps.setString(2, e.getValue()); ps.setString(2, e.getValue());
@@ -2568,8 +2674,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN contacts AS c" + " JOIN expiryVersions AS ev"
+ " ON cg.contactId = c.contactId" + " ON cg.contactId = ev.contactId"
+ " WHERE messageId = ?" + " WHERE messageId = ?"
+ " AND cg.contactId = ?" + " AND cg.contactId = ?"
+ " AND timestamp >= expiry"; + " AND timestamp >= expiry";
@@ -2600,7 +2706,7 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
public void setSubscriptions(Connection txn, ContactId c, public void setSubscriptions(Connection txn, ContactId c,
SubscriptionUpdate s) throws DbException { SubscriptionUpdate u) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
@@ -2620,7 +2726,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " SET remoteVersion = ?, remoteAcked = FALSE" + " SET remoteVersion = ?, remoteAcked = FALSE"
+ " WHERE contactId = ?"; + " WHERE contactId = ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setLong(1, Math.max(version, s.getVersionNumber())); ps.setLong(1, Math.max(version, u.getVersionNumber()));
ps.setInt(2, c.getInt()); ps.setInt(2, c.getInt());
int affected = ps.executeUpdate(); int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException(); if(affected > 1) throw new DbStateException();
@@ -2631,7 +2737,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.setInt(1, c.getInt()); ps.setInt(1, c.getInt());
ps.executeUpdate(); ps.executeUpdate();
// Store the new subscriptions, if any // Store the new subscriptions, if any
Collection<Group> subs = s.getGroups(); Collection<Group> subs = u.getGroups();
if(subs.isEmpty()) return; if(subs.isEmpty()) return;
sql = "INSERT INTO contactGroups (contactId, groupId, name, key)" sql = "INSERT INTO contactGroups (contactId, groupId, name, key)"
+ " VALUES (?, ?, ?, ?)"; + " VALUES (?, ?, ?, ?)";
@@ -2664,11 +2770,13 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null; PreparedStatement ps = null;
try { try {
String sql = "UPDATE groupVersions SET localAcked = ?" String sql = "UPDATE groupVersions SET localAcked = ?"
+ " WHERE contactId = ? AND localAcked < ?"; + " WHERE contactId = ?"
+ " AND localAcked < ? AND localVersion >= ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setLong(1, version); ps.setLong(1, version);
ps.setInt(2, c.getInt()); ps.setInt(2, c.getInt());
ps.setLong(3, version); ps.setLong(3, version);
ps.setLong(4, version);
int affected = ps.executeUpdate(); int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException(); if(affected > 1) throw new DbStateException();
ps.close(); ps.close();
@@ -2684,12 +2792,13 @@ abstract class JdbcDatabase implements Database<Connection> {
try { try {
String sql = "UPDATE transportVersions SET localAcked = ?" String sql = "UPDATE transportVersions SET localAcked = ?"
+ " WHERE contactId = ? AND transportId = ?" + " WHERE contactId = ? AND transportId = ?"
+ " AND localAcked < ?"; + " AND localAcked < ? AND localVersion >= ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setLong(1, version); ps.setLong(1, version);
ps.setInt(2, c.getInt()); ps.setInt(2, c.getInt());
ps.setBytes(3, t.getBytes()); ps.setBytes(3, t.getBytes());
ps.setLong(4, version); ps.setLong(4, version);
ps.setLong(5, version);
int affected = ps.executeUpdate(); int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException(); if(affected > 1) throw new DbStateException();
ps.close(); ps.close();

View File

@@ -126,11 +126,11 @@ class ProtocolWriterImpl implements ProtocolWriter {
if(flush) out.flush(); if(flush) out.flush();
} }
public void writeSubscriptionUpdate(SubscriptionUpdate s) public void writeSubscriptionUpdate(SubscriptionUpdate u)
throws IOException { throws IOException {
w.writeStructId(SUBSCRIPTION_UPDATE); w.writeStructId(SUBSCRIPTION_UPDATE);
w.writeListStart(); w.writeListStart();
for(Group g : s.getGroups()) { for(Group g : u.getGroups()) {
w.writeStructId(GROUP); w.writeStructId(GROUP);
w.writeString(g.getName()); w.writeString(g.getName());
byte[] publicKey = g.getPublicKey(); byte[] publicKey = g.getPublicKey();
@@ -138,7 +138,7 @@ class ProtocolWriterImpl implements ProtocolWriter {
else w.writeBytes(publicKey); else w.writeBytes(publicKey);
} }
w.writeListEnd(); w.writeListEnd();
w.writeInt64(s.getVersionNumber()); w.writeInt64(u.getVersionNumber());
if(flush) out.flush(); if(flush) out.flush();
} }
@@ -149,11 +149,11 @@ class ProtocolWriterImpl implements ProtocolWriter {
if(flush) out.flush(); if(flush) out.flush();
} }
public void writeTransportUpdate(TransportUpdate t) throws IOException { public void writeTransportUpdate(TransportUpdate u) throws IOException {
w.writeStructId(TRANSPORT_UPDATE); w.writeStructId(TRANSPORT_UPDATE);
w.writeBytes(t.getId().getBytes()); w.writeBytes(u.getId().getBytes());
w.writeMap(t.getProperties()); w.writeMap(u.getProperties());
w.writeInt64(t.getVersionNumber()); w.writeInt64(u.getVersionNumber());
if(flush) out.flush(); if(flush) out.flush();
} }

View File

@@ -173,11 +173,11 @@ abstract class DuplexConnection implements DatabaseListener {
// Start sending the requested messages // Start sending the requested messages
dbExecutor.execute(new GenerateBatches(requested)); dbExecutor.execute(new GenerateBatches(requested));
} else if(reader.hasSubscriptionUpdate()) { } else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate s = reader.readSubscriptionUpdate(); SubscriptionUpdate u = reader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if(reader.hasTransportUpdate()) { } else if(reader.hasTransportUpdate()) {
TransportUpdate t = reader.readTransportUpdate(); TransportUpdate u = reader.readTransportUpdate();
dbExecutor.execute(new ReceiveTransportUpdate(t)); dbExecutor.execute(new ReceiveTransportUpdate(u));
} else { } else {
throw new FormatException(); throw new FormatException();
} }
@@ -524,8 +524,8 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() { public void run() {
try { try {
SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId);
if(s != null) writerTasks.add(new WriteSubscriptionUpdate(s)); if(u != null) writerTasks.add(new WriteSubscriptionUpdate(u));
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} }
@@ -578,7 +578,7 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() { public void run() {
assert writer != null; assert writer != null;
try { try {
for(TransportUpdate t : updates) writer.writeTransportUpdate(t); for(TransportUpdate u : updates) writer.writeTransportUpdate(u);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true); dispose(true, true);

View File

@@ -83,11 +83,11 @@ class IncomingSimplexConnection {
UnverifiedMessage m = reader.readMessage(); UnverifiedMessage m = reader.readMessage();
verificationExecutor.execute(new VerifyMessage(m)); verificationExecutor.execute(new VerifyMessage(m));
} else if(reader.hasSubscriptionUpdate()) { } else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate s = reader.readSubscriptionUpdate(); SubscriptionUpdate u = reader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if(reader.hasTransportUpdate()) { } else if(reader.hasTransportUpdate()) {
TransportUpdate t = reader.readTransportUpdate(); TransportUpdate u = reader.readTransportUpdate();
dbExecutor.execute(new ReceiveTransportUpdate(t)); dbExecutor.execute(new ReceiveTransportUpdate(u));
} else { } else {
throw new FormatException(); throw new FormatException();
} }

View File

@@ -71,11 +71,11 @@ class OutgoingSimplexConnection {
Collection<TransportUpdate> updates = Collection<TransportUpdate> updates =
db.generateTransportUpdates(contactId); db.generateTransportUpdates(contactId);
if(updates != null) { if(updates != null) {
for(TransportUpdate t : updates) writer.writeTransportUpdate(t); for(TransportUpdate u : updates) writer.writeTransportUpdate(u);
} }
// Write a subscription update. FIXME: Check for space // Write a subscription update. FIXME: Check for space
SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId);
if(s != null) writer.writeSubscriptionUpdate(s); if(u != null) writer.writeSubscriptionUpdate(u);
// Write acks until you can't write acks no more // Write acks until you can't write acks no more
capacity = conn.getRemainingCapacity(); capacity = conn.getRemainingCapacity();
int maxMessages = writer.getMaxMessagesForAck(capacity); int maxMessages = writer.getMaxMessagesForAck(capacity);