Record when the transports and subscriptions visible to each contact

were last modified.

In future this will be used to determine when to send updates.
This commit is contained in:
akwizgran
2011-10-18 17:32:32 +01:00
parent d7a417f36d
commit c8b2cc38de
4 changed files with 173 additions and 166 deletions

View File

@@ -116,12 +116,11 @@ interface Database<T> {
boolean addPrivateMessage(T txn, Message m, ContactId c) throws DbException;
/**
* Subscribes to the given group and returns true if the subscription did
* not previously exist.
* Subscribes to the given group.
* <p>
* Locking: subscriptions write.
*/
boolean addSubscription(T txn, Group g) throws DbException;
void addSubscription(T txn, Group g) throws DbException;
/**
* Returns true if the database contains the given contact.
@@ -407,14 +406,15 @@ interface Database<T> {
void removeMessage(T txn, MessageId m) throws DbException;
/**
* Unsubscribes from the given group and returns true if a subscription
* previously existed. Any messages belonging to the group
* are deleted from the database.
* Unsubscribes from the given group and returns the IDs of any contacts
* affected by the change. Any messages belonging to the group are deleted
* from the database.
* <p>
* Locking: contacts read, messages write, messageStatuses write,
* subscriptions write.
*/
boolean removeSubscription(T txn, GroupId g) throws DbException;
Collection<ContactId> removeSubscription(T txn, GroupId g)
throws DbException;
/**
* Sets the configuration for the given transport, replacing any existing
@@ -436,11 +436,12 @@ interface Database<T> {
/**
* Sets the local transport properties for the given transport, replacing
* any existing properties for that transport.
* any existing properties for that transport. Returns true if the
* properties have changed.
* <p>
* Locking: transports write.
*/
void setLocalProperties(T txn, TransportId t, TransportProperties p)
boolean setLocalProperties(T txn, TransportId t, TransportProperties p)
throws DbException;
/**
@@ -517,10 +518,11 @@ interface Database<T> {
/**
* Makes the given group visible to the given set of contacts and invisible
* to any other contacts.
* to any other contacts. Returns the IDs of any contacts affected by the
* change.
* <p>
* Locking: contacts read, subscriptions write.
*/
void setVisibility(T txn, GroupId g, Collection<ContactId> visible)
throws DbException;
Collection<ContactId> setVisibility(T txn, GroupId g,
Collection<ContactId> visible) throws DbException;
}

View File

@@ -11,7 +11,6 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -1114,16 +1113,11 @@ DatabaseCleaner.Callback {
public void setConfig(TransportId t, TransportConfig config)
throws DbException {
boolean changed = false;
transportLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
TransportConfig old = db.getConfig(txn, t);
if(!config.equals(old)) {
db.setConfig(txn, t, config);
changed = true;
}
db.setConfig(txn, t, config);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1132,8 +1126,6 @@ DatabaseCleaner.Callback {
} finally {
transportLock.writeLock().unlock();
}
// Call the listeners outside the lock
if(changed) callListeners(new TransportsUpdatedEvent());
}
public void setConnectionWindow(ContactId c, TransportId t,
@@ -1160,16 +1152,12 @@ DatabaseCleaner.Callback {
public void setLocalProperties(TransportId t,
TransportProperties properties) throws DbException {
boolean changed = false;
boolean changed;
transportLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
TransportProperties old = db.getLocalTransports(txn).get(t);
if(!properties.equals(old)) {
db.setLocalProperties(txn, t, properties);
changed = true;
}
changed = db.setLocalProperties(txn, t, properties);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1278,19 +1266,16 @@ DatabaseCleaner.Callback {
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
Collection<ContactId> then, now;
Collection<ContactId> affected;
contactLock.readLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
// Get the contacts to which the group used to be visible
then = new HashSet<ContactId>(db.getVisibility(txn, g));
// Don't try to make the group visible to ex-contacts
now = new HashSet<ContactId>(visible);
now.retainAll(new HashSet<ContactId>(db.getContacts(txn)));
db.setVisibility(txn, g, now);
visible.retainAll((db.getContacts(txn)));
affected = db.setVisibility(txn, g, visible);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1302,22 +1287,19 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
// Work out which contacts were affected by the change
Collection<ContactId> affected = new ArrayList<ContactId>();
for(ContactId c : then) if(!now.contains(c)) affected.add(c);
for(ContactId c : now) if(!then.contains(c)) affected.add(c);
// Call the listeners outside the lock
callListeners(new SubscriptionsUpdatedEvent(affected));
if(!affected.isEmpty())
callListeners(new SubscriptionsUpdatedEvent(affected));
}
public void subscribe(Group g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
boolean added;
subscriptionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
added = db.addSubscription(txn, g);
if(!db.containsSubscription(txn, g.getId()))
db.addSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1326,14 +1308,12 @@ DatabaseCleaner.Callback {
} finally {
subscriptionLock.writeLock().unlock();
}
// Call the listeners outside the lock
if(added) callListeners(new SubscriptionsUpdatedEvent());
}
public void unsubscribe(GroupId g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g);
boolean removed;
Collection<ContactId> affected;
boolean removed = false;
Collection<ContactId> affected = null;
contactLock.readLock().lock();
try {
messageLock.writeLock().lock();
@@ -1344,8 +1324,10 @@ DatabaseCleaner.Callback {
try {
T txn = db.startTransaction();
try {
affected = db.getVisibility(txn, g);
removed = db.removeSubscription(txn, g);
if(db.containsSubscription(txn, g)) {
affected = db.removeSubscription(txn, g);
removed = true;
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1364,7 +1346,8 @@ DatabaseCleaner.Callback {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
if(removed) callListeners(new SubscriptionsUpdatedEvent(affected));
if(removed && !affected.isEmpty())
callListeners(new SubscriptionsUpdatedEvent(affected));
}
public void checkFreeSpaceAndClean() throws DbException {

View File

@@ -12,7 +12,9 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
@@ -207,6 +209,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " (contactId INT NOT NULL,"
+ " sent BIGINT NOT NULL,"
+ " received BIGINT NOT NULL,"
+ " modified BIGINT NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
@@ -216,6 +219,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " (contactId INT NOT NULL,"
+ " sent BIGINT NOT NULL,"
+ " received BIGINT NOT NULL,"
+ " modified BIGINT NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
@@ -507,23 +511,25 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.close();
// Initialise the subscription timestamps
sql = "INSERT INTO subscriptionTimestamps"
+ " (contactId, sent, received)"
+ " VALUES (?, ?, ?)";
+ " (contactId, sent, received, modified)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setLong(2, 0L);
ps.setLong(3, 0L);
ps.setLong(4, 1L);
affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
// Initialise the transport timestamps
sql = "INSERT INTO transportTimestamps"
+ " (contactId, sent, received)"
+ " VALUES (?, ?, ?)";
+ " (contactId, sent, received, modified)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setLong(2, 0L);
ps.setLong(3, 0L);
ps.setLong(4, 1L);
affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
@@ -652,20 +658,10 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public boolean addSubscription(Connection txn, Group g) throws DbException {
public void addSubscription(Connection txn, Group g) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT NULL FROM subscriptions WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getId().getBytes());
rs = ps.executeQuery();
boolean found = rs.next();
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
if(found) return false;
sql = "INSERT INTO subscriptions"
String sql = "INSERT INTO subscriptions"
+ " (groupId, groupName, groupKey, start)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
@@ -676,7 +672,6 @@ abstract class JdbcDatabase implements Database<Connection> {
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
return true;
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
@@ -1765,18 +1760,30 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public boolean removeSubscription(Connection txn, GroupId g)
public Collection<ContactId> removeSubscription(Connection txn, GroupId g)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "DELETE FROM subscriptions WHERE groupId = ?";
// Retrieve the contacts to which the subscription is visible
String sql = "SELECT contactId FROM visibilities WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
rs = ps.executeQuery();
Collection<ContactId> visible = new ArrayList<ContactId>();
while(rs.next()) visible.add(new ContactId(rs.getInt(1)));
rs.close();
ps.close();
// Delete the subscription
sql = "DELETE FROM subscriptions WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
if(affected != 1) throw new DbStateException();
ps.close();
return affected > 0;
return visible;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
@@ -1784,31 +1791,26 @@ abstract class JdbcDatabase implements Database<Connection> {
public void setConfig(Connection txn, TransportId t, TransportConfig config)
throws DbException {
setTransportDetails(txn, t, config, "transportConfig");
}
private void setTransportDetails(Connection txn, TransportId t,
Map<String, String> details, String table) throws DbException {
PreparedStatement ps = null;
try {
// Delete any existing details for the given transport
String sql = "DELETE FROM " + table + " WHERE transportId = ?";
// Delete any existing config for the given transport
String sql = "DELETE FROM transportConfig WHERE transportId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, t.getInt());
ps.executeUpdate();
ps.close();
// Store the new details
sql = "INSERT INTO " + table + " (transportId, key, value)"
+ " VALUES (?, ?, ?)";
// Store the new config
sql = "INSERT INTO transportConfig (transportId, key, value)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, t.getInt());
for(Entry<String, String> e : details.entrySet()) {
for(Entry<String, String> e : config.entrySet()) {
ps.setString(2, e.getKey());
ps.setString(3, e.getValue());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if(batchAffected.length != details.size())
if(batchAffected.length != config.size())
throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] != 1) throw new DbStateException();
@@ -1868,9 +1870,58 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setLocalProperties(Connection txn, TransportId t,
public boolean setLocalProperties(Connection txn, TransportId t,
TransportProperties properties) throws DbException {
setTransportDetails(txn, t, properties, "transports");
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Retrieve any existing properties for the given transport
String sql = "SELECT key, value FROM transports"
+ " WHERE transportId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, t.getInt());
rs = ps.executeQuery();
TransportProperties old = new TransportProperties();
while(rs.next()) old.put(rs.getString(1), rs.getString(2));
rs.close();
ps.close();
// If the properties haven't changed, return
if(old.equals(properties)) return false;
// Delete any existing properties for the given transport
sql = "DELETE FROM transports WHERE transportId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, t.getInt());
ps.executeUpdate();
ps.close();
// Store the new properties
sql = "INSERT INTO transports (transportId, key, value)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, t.getInt());
for(Entry<String, String> e : properties.entrySet()) {
ps.setString(2, e.getKey());
ps.setString(3, e.getValue());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if(batchAffected.length != properties.size())
throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
// Update the transport timestamps of all contacts
sql = "UPDATE transportTimestamps set modified = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, System.currentTimeMillis());
ps.executeUpdate();
ps.close();
return true;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Rating setRating(Connection txn, AuthorId a, Rating r)
@@ -2179,17 +2230,29 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setVisibility(Connection txn, GroupId g,
public Collection<ContactId> setVisibility(Connection txn, GroupId g,
Collection<ContactId> visible) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Retrieve any existing visibilities
String sql = "SELECT contactId FROM visibilities WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
rs = ps.executeQuery();
Collection<ContactId> then = new HashSet<ContactId>();
while(rs.next()) then.add(new ContactId(rs.getInt(1)));
rs.close();
ps.close();
// If the visibilities haven't changed, return
Collection<ContactId> now = new HashSet<ContactId>(visible);
if(then.equals(now)) return Collections.emptyList();
// Delete any existing visibilities
String sql = "DELETE FROM visibilities WHERE groupId = ?";
sql = "DELETE FROM visibilities where groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
ps.executeUpdate();
ps.close();
if(visible.isEmpty()) return;
// Store the new visibilities
sql = "INSERT INTO visibilities (groupId, contactId)"
+ " VALUES (?, ?)";
@@ -2205,7 +2268,29 @@ abstract class JdbcDatabase implements Database<Connection> {
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
// Update the subscription timestamps of any affected contacts
Collection<ContactId> affected = new ArrayList<ContactId>();
for(ContactId c : then) if(!now.contains(c)) affected.add(c);
for(ContactId c : now) if(!then.contains(c)) affected.add(c);
sql = "UPDATE subscriptionTimestamps SET modified = ?"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, System.currentTimeMillis());
for(ContactId c : affected) {
ps.setInt(2, c.getInt());
ps.addBatch();
}
batchAffected = ps.executeBatch();
if(batchAffected.length != affected.size())
throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] > 1) throw new DbStateException();
}
ps.close();
return affected;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}

View File

@@ -10,7 +10,6 @@ import junit.framework.TestCase;
import net.sf.briar.TestUtils;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating;
import net.sf.briar.api.TransportConfig;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.db.DatabaseComponent;
@@ -20,7 +19,6 @@ import net.sf.briar.api.db.event.ContactAddedEvent;
import net.sf.briar.api.db.event.ContactRemovedEvent;
import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.db.event.MessagesAddedEvent;
import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent;
import net.sf.briar.api.db.event.TransportsUpdatedEvent;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AuthorId;
@@ -134,27 +132,26 @@ public abstract class DatabaseComponentTest extends TestCase {
oneOf(database).getRemoteProperties(txn, transportId);
will(returnValue(remoteProperties));
// subscribe(group)
oneOf(database).addSubscription(txn, group);
will(returnValue(true));
oneOf(listener).eventOccurred(with(any(
SubscriptionsUpdatedEvent.class)));
// subscribe(group) again
oneOf(database).addSubscription(txn, group);
oneOf(group).getId();
will(returnValue(groupId));
oneOf(database).containsSubscription(txn, groupId);
will(returnValue(false));
oneOf(database).addSubscription(txn, group);
// subscribe(group) again
oneOf(group).getId();
will(returnValue(groupId));
oneOf(database).containsSubscription(txn, groupId);
will(returnValue(true));
// getSubscriptions()
oneOf(database).getSubscriptions(txn);
will(returnValue(Collections.singletonList(groupId)));
// unsubscribe(groupId)
oneOf(database).getVisibility(txn, groupId);
will(returnValue(Collections.<ContactId>emptySet()));
oneOf(database).removeSubscription(txn, groupId);
oneOf(database).containsSubscription(txn, groupId);
will(returnValue(true));
oneOf(listener).eventOccurred(with(any(
SubscriptionsUpdatedEvent.class)));
// unsubscribe(groupId) again
oneOf(database).getVisibility(txn, groupId);
will(returnValue(Collections.<ContactId>emptySet()));
oneOf(database).removeSubscription(txn, groupId);
will(returnValue(Collections.<ContactId>emptyList()));
// unsubscribe(groupId) again
oneOf(database).containsSubscription(txn, groupId);
will(returnValue(false));
// setConnectionWindow(contactId, 123, connectionWindow)
oneOf(database).containsContact(txn, contactId);
@@ -1278,8 +1275,6 @@ public abstract class DatabaseComponentTest extends TestCase {
throws Exception {
final TransportProperties properties =
new TransportProperties(Collections.singletonMap("bar", "baz"));
final TransportProperties properties1 =
new TransportProperties(Collections.singletonMap("baz", "bam"));
Mockery context = new Mockery();
@SuppressWarnings("unchecked")
final Database<Object> database = context.mock(Database.class);
@@ -1288,11 +1283,8 @@ public abstract class DatabaseComponentTest extends TestCase {
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
oneOf(database).getLocalTransports(txn);
will(returnValue(Collections.singletonMap(transportId,
properties)));
oneOf(database).setLocalProperties(txn, transportId,
properties1);
oneOf(database).setLocalProperties(txn, transportId, properties);
will(returnValue(true));
oneOf(database).commitTransaction(txn);
oneOf(listener).eventOccurred(with(any(
TransportsUpdatedEvent.class)));
@@ -1300,7 +1292,7 @@ public abstract class DatabaseComponentTest extends TestCase {
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.addListener(listener);
db.setLocalProperties(transportId, properties1);
db.setLocalProperties(transportId, properties);
context.assertIsSatisfied();
}
@@ -1318,9 +1310,8 @@ public abstract class DatabaseComponentTest extends TestCase {
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
oneOf(database).getLocalTransports(txn);
will(returnValue(Collections.singletonMap(transportId,
properties)));
oneOf(database).setLocalProperties(txn, transportId, properties);
will(returnValue(false));
oneOf(database).commitTransaction(txn);
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);
@@ -1331,60 +1322,6 @@ public abstract class DatabaseComponentTest extends TestCase {
context.assertIsSatisfied();
}
@Test
public void testTransportConfigChangedCallsListeners() throws Exception {
final TransportConfig config =
new TransportConfig(Collections.singletonMap("bar", "baz"));
final TransportConfig config1 =
new TransportConfig(Collections.singletonMap("baz", "bam"));
Mockery context = new Mockery();
@SuppressWarnings("unchecked")
final Database<Object> database = context.mock(Database.class);
final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class);
final DatabaseListener listener = context.mock(DatabaseListener.class);
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
oneOf(database).getConfig(txn, transportId);
will(returnValue(config));
oneOf(database).setConfig(txn, transportId, config1);
oneOf(database).commitTransaction(txn);
oneOf(listener).eventOccurred(with(any(
TransportsUpdatedEvent.class)));
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.addListener(listener);
db.setConfig(transportId, config1);
context.assertIsSatisfied();
}
@Test
public void testTransportConfigUnchangedDoesNotCallListeners()
throws Exception {
final TransportConfig config =
new TransportConfig(Collections.singletonMap("bar", "baz"));
Mockery context = new Mockery();
@SuppressWarnings("unchecked")
final Database<Object> database = context.mock(Database.class);
final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class);
final DatabaseListener listener = context.mock(DatabaseListener.class);
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
oneOf(database).getConfig(txn, transportId);
will(returnValue(config));
oneOf(database).commitTransaction(txn);
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.addListener(listener);
db.setConfig(transportId, config);
context.assertIsSatisfied();
}
@Test
public void testSetSeen() throws Exception {
Mockery context = new Mockery();