Don't send subscription or transport updates unless an update is due.

An update is due if the information has changed since the last update,
or if no update has been sent for 12 hours (to ensure that lost
updates are eventually replaced).
This commit is contained in:
akwizgran
2011-10-19 16:34:58 +01:00
parent 93cd31fa2d
commit c828db2e95
5 changed files with 256 additions and 41 deletions

View File

@@ -344,6 +344,37 @@ interface Database<T> {
*/
Collection<Group> getSubscriptions(T txn, ContactId c) throws DbException;
/**
* Returns the time at which the subscriptions visible to the given contact
* were last modified.
* <p>
* Locking: contacts read, subscriptions read.
*/
long getSubscriptionsModified(T txn, ContactId c) throws DbException;
/**
* Returns the time at which a subscription update was last sent to the
* given contact.
* <p>
* Locking: contacts read, subscriptions read.
*/
long getSubscriptionsSent(T txn, ContactId c) throws DbException;
/**
* Returns the time at which the local transports were last modified.
* <p>
* Locking: transports read.
*/
long getTransportsModified(T txn) throws DbException;
/**
* Returns the time at which a transport update was last sent to the given
* contact.
* <p>
* Locking: contacts read, transports read.
*/
long getTransportsSent(T txn, ContactId c) throws DbException;
/**
* Returns the contacts to which the given group is visible.
* <p>
@@ -492,8 +523,8 @@ interface Database<T> {
* <p>
* Locking: contacts read, subscriptions write.
*/
void setSubscriptionsModifiedTimestamp(T txn,
Collection<ContactId> contacts, long timestamp) throws DbException;
void setSubscriptionsModified(T txn, Collection<ContactId> contacts,
long timestamp) throws DbException;
/**
* Records the time at which a subscription update was last sent to the
@@ -501,7 +532,7 @@ interface Database<T> {
* <p>
* Locking: contacts read, subscriptions write.
*/
void setSubscriptionsSentTimestamp(T txn, ContactId c, long timestamp)
void setSubscriptionsSent(T txn, ContactId c, long timestamp)
throws DbException;
/**
@@ -520,8 +551,7 @@ interface Database<T> {
* <p>
* Locking: contacts read, transports write.
*/
void setTransportsModifiedTimestamp(T txn, long timestamp)
throws DbException;
void setTransportsModified(T txn, long timestamp) throws DbException;
/**
* Records the time at which a transport update was last sent to the given
@@ -529,7 +559,7 @@ interface Database<T> {
* <p>
* Locking: contacts read, transports write.
*/
void setTransportsSentTimestamp(T txn, ContactId c, long timestamp)
void setTransportsSent(T txn, ContactId c, long timestamp)
throws DbException;
/**

View File

@@ -585,8 +585,8 @@ DatabaseCleaner.Callback {
public void generateSubscriptionUpdate(ContactId c, SubscriptionWriter s)
throws DbException, IOException {
Map<Group, Long> subs;
long timestamp;
Map<Group, Long> subs = null;
long timestamp = 0L;
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -594,9 +594,14 @@ DatabaseCleaner.Callback {
try {
T txn = db.startTransaction();
try {
subs = db.getVisibleSubscriptions(txn, c);
timestamp = System.currentTimeMillis();
db.setSubscriptionsSentTimestamp(txn, c, timestamp);
// Work out whether an update is due
long modified = db.getSubscriptionsModified(txn, c);
long sent = db.getSubscriptionsSent(txn, c);
if(modified >= sent || updateIsDue(sent)) {
subs = db.getVisibleSubscriptions(txn, c);
timestamp = System.currentTimeMillis();
db.setSubscriptionsSent(txn, c, timestamp);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -608,15 +613,22 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
s.writeSubscriptions(subs, timestamp);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions to update");
if(subs != null) {
s.writeSubscriptions(subs, timestamp);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions to update");
}
}
private boolean updateIsDue(long sent) {
long now = System.currentTimeMillis();
return now - sent >= DatabaseConstants.MAX_UPDATE_INTERVAL;
}
public void generateTransportUpdate(ContactId c, TransportWriter t)
throws DbException, IOException {
Map<TransportId, TransportProperties> transports;
long timestamp;
Map<TransportId, TransportProperties> transports = null;
long timestamp = 0L;
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -624,9 +636,14 @@ DatabaseCleaner.Callback {
try {
T txn = db.startTransaction();
try {
transports = db.getLocalTransports(txn);
timestamp = System.currentTimeMillis();
db.setTransportsSentTimestamp(txn, c, timestamp);
// Work out whether an update is due
long modified = db.getTransportsModified(txn);
long sent = db.getTransportsSent(txn, c);
if(modified >= sent || updateIsDue(sent)) {
transports = db.getLocalTransports(txn);
timestamp = System.currentTimeMillis();
db.setTransportsSent(txn, c, timestamp);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -638,9 +655,12 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
t.writeTransports(transports, timestamp);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports to update");
if(transports != null) {
t.writeTransports(transports, timestamp);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() +
" transports to update");
}
}
public TransportConfig getConfig(TransportId t) throws DbException {
@@ -1178,8 +1198,7 @@ DatabaseCleaner.Callback {
try {
if(!p.equals(db.getLocalProperties(txn, t))) {
db.setLocalProperties(txn, t, p);
db.setTransportsModifiedTimestamp(txn,
System.currentTimeMillis());
db.setTransportsModified(txn, System.currentTimeMillis());
changed = true;
}
db.commitTransaction(txn);
@@ -1290,20 +1309,31 @@ DatabaseCleaner.Callback {
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
// Use HashSets for O(1) lookups, giving O(n) overall running time
HashSet<ContactId> then, now;
Collection<ContactId> affected;
contactLock.readLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
// Use HashSets for O(1) lookups, O(n) overall running time
HashSet<ContactId> then, now;
// Retrieve the group's current visibility
then = new HashSet<ContactId>(db.getVisibility(txn, g));
// Don't try to make the group visible to ex-contacts
now = new HashSet<ContactId>(visible);
now.retainAll(new HashSet<ContactId>(db.getContacts(txn)));
db.setVisibility(txn, g, now);
// Work out which contacts were affected by the change
affected = new ArrayList<ContactId>();
for(ContactId c : then) {
if(!now.contains(c)) affected.add(c);
}
for(ContactId c : now) {
if(!then.contains(c)) affected.add(c);
}
db.setSubscriptionsModified(txn, affected,
System.currentTimeMillis());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1315,10 +1345,6 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
// Work out which contacts were affected by the change
Collection<ContactId> affected = new ArrayList<ContactId>();
for(ContactId c : then) if(!now.contains(c)) affected.add(c);
for(ContactId c : now) if(!then.contains(c)) affected.add(c);
// Call the listeners outside the lock
if(!affected.isEmpty())
callListeners(new SubscriptionsUpdatedEvent(affected));
@@ -1344,7 +1370,6 @@ DatabaseCleaner.Callback {
public void unsubscribe(GroupId g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g);
boolean removed = false;
Collection<ContactId> affected = null;
contactLock.readLock().lock();
try {
@@ -1359,7 +1384,6 @@ DatabaseCleaner.Callback {
if(db.containsSubscription(txn, g)) {
affected = db.getVisibility(txn, g);
db.removeSubscription(txn, g);
removed = true;
}
db.commitTransaction(txn);
} catch(DbException e) {
@@ -1379,7 +1403,7 @@ DatabaseCleaner.Callback {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
if(removed && !affected.isEmpty())
if(affected != null && !affected.isEmpty())
callListeners(new SubscriptionsUpdatedEvent(affected));
}

View File

@@ -47,4 +47,10 @@ interface DatabaseConstants {
* recently sent batches have been acknowledged.
*/
static final int RETRANSMIT_THRESHOLD = 5;
/**
* The time in milliseconds after which a subscription or transport update
* should be sent to a contact even if no changes have occurred.
*/
static final long MAX_UPDATE_INTERVAL = 12L * 60L * 60L * 1000L; // 12 hours
}

View File

@@ -1498,6 +1498,95 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public long getSubscriptionsModified(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT modified FROM subscriptionTimestamps"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if(!rs.next()) throw new DbException();
long modified = rs.getLong(1);
if(rs.next()) throw new DbException();
rs.close();
ps.close();
return modified;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public long getSubscriptionsSent(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT sent FROM subscriptionTimestamps"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if(!rs.next()) throw new DbException();
long sent = rs.getLong(1);
if(rs.next()) throw new DbException();
rs.close();
ps.close();
return sent;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public long getTransportsModified(Connection txn) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT DISTINCT modified FROM transportTimestamps";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
if(!rs.next()) throw new DbException();
long modified = rs.getLong(1);
if(rs.next()) throw new DbException();
rs.close();
ps.close();
return modified;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public long getTransportsSent(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT sent FROM transportTimestamps"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if(!rs.next()) throw new DbException();
long sent = rs.getLong(1);
if(rs.next()) throw new DbException();
rs.close();
ps.close();
return sent;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Collection<ContactId> getVisibility(Connection txn, GroupId g)
throws DbException {
PreparedStatement ps = null;
@@ -2112,7 +2201,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setSubscriptionsModifiedTimestamp(Connection txn,
public void setSubscriptionsModified(Connection txn,
Collection<ContactId> contacts, long timestamp) throws DbException {
PreparedStatement ps = null;
try {
@@ -2137,7 +2226,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setSubscriptionsSentTimestamp(Connection txn, ContactId c,
public void setSubscriptionsSent(Connection txn, ContactId c,
long timestamp) throws DbException {
PreparedStatement ps = null;
try {
@@ -2219,7 +2308,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setTransportsModifiedTimestamp(Connection txn, long timestamp)
public void setTransportsModified(Connection txn, long timestamp)
throws DbException {
PreparedStatement ps = null;
try {
@@ -2234,8 +2323,8 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setTransportsSentTimestamp(Connection txn, ContactId c,
long timestamp) throws DbException {
public void setTransportsSent(Connection txn, ContactId c, long timestamp)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE transportTimestamps SET sent = ?"

View File

@@ -735,6 +735,34 @@ public abstract class DatabaseComponentTest extends TestCase {
context.assertIsSatisfied();
}
@Test
public void testSubscriptionUpdateNotSentUnlessDue() throws Exception {
final long now = System.currentTimeMillis();
Mockery context = new Mockery();
@SuppressWarnings("unchecked")
final Database<Object> database = context.mock(Database.class);
final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class);
final SubscriptionWriter subscriptionWriter =
context.mock(SubscriptionWriter.class);
context.checking(new Expectations() {{
allowing(database).startTransaction();
will(returnValue(txn));
allowing(database).commitTransaction(txn);
allowing(database).containsContact(txn, contactId);
will(returnValue(true));
// Check whether an update is due
oneOf(database).getSubscriptionsModified(txn, contactId);
will(returnValue(now - 1L));
oneOf(database).getSubscriptionsSent(txn, contactId);
will(returnValue(now));
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.generateSubscriptionUpdate(contactId, subscriptionWriter);
context.assertIsSatisfied();
}
@Test
public void testGenerateSubscriptionUpdate() throws Exception {
final MessageId messageId1 = new MessageId(TestUtils.getRandomId());
@@ -753,10 +781,15 @@ public abstract class DatabaseComponentTest extends TestCase {
allowing(database).commitTransaction(txn);
allowing(database).containsContact(txn, contactId);
will(returnValue(true));
// Check whether an update is due
oneOf(database).getSubscriptionsModified(txn, contactId);
will(returnValue(0L));
oneOf(database).getSubscriptionsSent(txn, contactId);
will(returnValue(0L));
// Get the visible subscriptions
oneOf(database).getVisibleSubscriptions(txn, contactId);
will(returnValue(Collections.singletonMap(group, 0L)));
oneOf(database).setSubscriptionsSentTimestamp(with(txn), with(contactId),
oneOf(database).setSubscriptionsSent(with(txn), with(contactId),
with(any(long.class)));
// Add the subscriptions to the writer
oneOf(subscriptionWriter).writeSubscriptions(
@@ -770,6 +803,34 @@ public abstract class DatabaseComponentTest extends TestCase {
context.assertIsSatisfied();
}
@Test
public void testTransportUpdateNotSentUnlessDue() throws Exception {
final long now = System.currentTimeMillis();
Mockery context = new Mockery();
@SuppressWarnings("unchecked")
final Database<Object> database = context.mock(Database.class);
final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class);
final TransportWriter transportWriter =
context.mock(TransportWriter.class);
context.checking(new Expectations() {{
allowing(database).startTransaction();
will(returnValue(txn));
allowing(database).commitTransaction(txn);
allowing(database).containsContact(txn, contactId);
will(returnValue(true));
// Check whether an update is due
oneOf(database).getTransportsModified(txn);
will(returnValue(now - 1L));
oneOf(database).getTransportsSent(txn, contactId);
will(returnValue(now));
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.generateTransportUpdate(contactId, transportWriter);
context.assertIsSatisfied();
}
@Test
public void testGenerateTransportUpdate() throws Exception {
final MessageId messageId1 = new MessageId(TestUtils.getRandomId());
@@ -788,10 +849,15 @@ public abstract class DatabaseComponentTest extends TestCase {
allowing(database).commitTransaction(txn);
allowing(database).containsContact(txn, contactId);
will(returnValue(true));
// Check whether an update is due
oneOf(database).getTransportsModified(txn);
will(returnValue(0L));
oneOf(database).getTransportsSent(txn, contactId);
will(returnValue(0L));
// Get the local transport properties
oneOf(database).getLocalTransports(txn);
will(returnValue(transports));
oneOf(database).setTransportsSentTimestamp(with(txn), with(contactId),
oneOf(database).setTransportsSent(with(txn), with(contactId),
with(any(long.class)));
// Add the properties to the writer
oneOf(transportWriter).writeTransports(with(transports),
@@ -1287,7 +1353,7 @@ public abstract class DatabaseComponentTest extends TestCase {
oneOf(database).getLocalProperties(txn, transportId);
will(returnValue(new TransportProperties()));
oneOf(database).setLocalProperties(txn, transportId, properties);
oneOf(database).setTransportsModifiedTimestamp(with(txn),
oneOf(database).setTransportsModified(with(txn),
with(any(long.class)));
oneOf(database).commitTransaction(txn);
oneOf(listener).eventOccurred(with(any(