Moved the subscription and transport timestamps out of the contacts

table so it's not necessary to hold a write lock on the (heavily used)
contacts table to update them.
This commit is contained in:
akwizgran
2011-08-14 14:46:12 +02:00
parent 2c13e35dc4
commit 5e0aadd373
12 changed files with 122 additions and 63 deletions

View File

@@ -9,6 +9,6 @@ import net.sf.briar.api.protocol.Group;
public interface SubscriptionWriter {
/** Writes the contents of the update. */
void writeSubscriptionUpdate(Map<Group, Long> subs, long timestamp)
void writeSubscriptions(Map<Group, Long> subs, long timestamp)
throws IOException;
}

View File

@@ -7,6 +7,6 @@ import java.util.Map;
public interface TransportWriter {
/** Writes the contents of the update. */
void writeTransportUpdate(Map<String, Map<String, String>> transports,
void writeTransports(Map<String, Map<String, String>> transports,
long timestamp) throws IOException;
}

View File

@@ -436,7 +436,7 @@ interface Database<T> {
* Sets the subscriptions for the given contact, replacing any existing
* subscriptions unless the existing subscriptions have a newer timestamp.
* <p>
* Locking: contacts write, subscriptions write.
* Locking: contacts read, subscriptions write.
*/
void setSubscriptions(T txn, ContactId c, Map<Group, Long> subs,
long timestamp) throws DbException;
@@ -464,7 +464,7 @@ interface Database<T> {
* existing properties unless the existing properties have a newer
* timestamp.
* <p>
* Locking: contacts write, transports write.
* Locking: contacts read, transports write.
*/
void setTransports(T txn, ContactId c,
Map<String, Map<String, String>> transports, long timestamp)

View File

@@ -77,8 +77,6 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String CREATE_CONTACTS =
"CREATE TABLE contacts"
+ " (contactId INT NOT NULL,"
+ " subscriptionsTimestamp BIGINT NOT NULL,"
+ " transportsTimestamp BIGINT NOT NULL,"
+ " secret BINARY NOT NULL,"
+ " PRIMARY KEY (contactId))";
@@ -166,26 +164,26 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String CREATE_CONTACT_TRANSPORTS =
"CREATE TABLE contactTransports"
+ " (contactId INT NOT NULL,"
+ " transportName VARCHAR NOT NULL,"
+ " name VARCHAR NOT NULL,"
+ " key VARCHAR NOT NULL,"
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (contactId, transportName, key),"
+ " PRIMARY KEY (contactId, name, key),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_TRANSPORTS =
"CREATE TABLE transports"
+ " (transportName VARCHAR NOT NULL,"
+ " (name VARCHAR NOT NULL,"
+ " key VARCHAR NOT NULL,"
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (transportName, key))";
+ " PRIMARY KEY (name, key))";
private static final String CREATE_TRANSPORT_CONFIG =
"CREATE TABLE transportConfig"
+ " (transportName VARCHAR NOT NULL,"
+ " (name VARCHAR NOT NULL,"
+ " key VARCHAR NOT NULL,"
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (transportName, key))";
+ " PRIMARY KEY (name, key))";
private static final String CREATE_CONNECTION_WINDOWS =
"CREATE TABLE connectionWindows"
@@ -197,6 +195,24 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_SUBSCRIPTION_TIMESTAMPS =
"CREATE TABLE subscriptionTimestamps"
+ " (contactId INT NOT NULL,"
+ " sent BIGINT NOT NULL,"
+ " received BIGINT NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_TRANSPORT_TIMESTAMPS =
"CREATE TABLE transportTimestamps"
+ " (contactId INT NOT NULL,"
+ " sent BIGINT NOT NULL,"
+ " received BIGINT NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final Logger LOG =
Logger.getLogger(JdbcDatabase.class.getName());
@@ -282,6 +298,8 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(insertTypeNames(CREATE_TRANSPORTS));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORT_CONFIG));
s.executeUpdate(insertTypeNames(CREATE_CONNECTION_WINDOWS));
s.executeUpdate(insertTypeNames(CREATE_SUBSCRIPTION_TIMESTAMPS));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORT_TIMESTAMPS));
s.close();
} catch(SQLException e) {
tryToClose(s);
@@ -450,20 +468,16 @@ abstract class JdbcDatabase implements Database<Connection> {
rs.close();
ps.close();
// Create a new contact row
sql = "INSERT INTO contacts (contactId, subscriptionsTimestamp,"
+ " transportsTimestamp, secret)"
+ " VALUES (?, ?, ?, ?)";
sql = "INSERT INTO contacts (contactId, secret) VALUES (?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setLong(2, 0L);
ps.setLong(3, 0L);
ps.setBytes(4, secret);
ps.setBytes(2, secret);
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
// Store the contact's transport properties
sql = "INSERT INTO contactTransports"
+ " (contactId, transportName, key, value)"
+ " (contactId, name, key, value)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -483,6 +497,28 @@ abstract class JdbcDatabase implements Database<Connection> {
if(batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
// Initialise the subscription timestamps
sql = "INSERT INTO subscriptionTimestamps"
+ " (contactId, sent, received)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setLong(2, 0L);
ps.setLong(3, 0L);
affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
// Initialise the transport timestamps
sql = "INSERT INTO transportTimestamps"
+ " (contactId, sent, received)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setLong(2, 0L);
ps.setLong(3, 0L);
affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
return c;
} catch(SQLException e) {
tryToClose(ps);
@@ -1199,7 +1235,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
String sql = "SELECT key, value FROM transportConfig"
+ " WHERE transportName = ?";
+ " WHERE name = ?";
ps = txn.prepareStatement(sql);
ps.setString(1, name);
rs = ps.executeQuery();
@@ -1220,9 +1256,8 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT transportName, key, value"
+ " FROM transports"
+ " ORDER BY transportName";
String sql = "SELECT name, key, value FROM transports"
+ " ORDER BY name";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
Map<String, Map<String, String>> transports =
@@ -1252,10 +1287,9 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT transportName, key, value"
+ " FROM contactTransports"
String sql = "SELECT name, key, value FROM contactTransports"
+ " WHERE contactId = ?"
+ " ORDER BY transportName";
+ " ORDER BY name";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
@@ -1747,7 +1781,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
// Return if the timestamp isn't fresh
String sql = "SELECT subscriptionsTimestamp FROM contacts"
String sql = "SELECT received FROM subscriptionTimestamps"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -1786,7 +1820,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
ps.close();
// Update the timestamp
sql = "UPDATE contacts SET subscriptionsTimestamp = ?"
sql = "UPDATE subscriptionTimestamps SET received = ?"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
@@ -1800,6 +1834,25 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setSubscriptionTimestamp(Connection txn, ContactId c,
long timestamp) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE subscriptionTimestamps SET sent = ?"
+ " WHERE contactId = ? AND sent < ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
ps.setInt(2, c.getInt());
ps.setLong(3, timestamp);
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setTransportConfig(Connection txn, String name,
Map<String, String> config) throws DbException {
setTransportDetails(txn, name, config, "transportConfig");
@@ -1810,13 +1863,13 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
try {
// Delete any existing details for the named transport
String sql = "DELETE FROM " + table + " WHERE transportName = ?";
String sql = "DELETE FROM " + table + " WHERE name = ?";
ps = txn.prepareStatement(sql);
ps.setString(1, name);
ps.executeUpdate();
ps.close();
// Store the new details
sql = "INSERT INTO " + table + " (transportName, key, value)"
sql = "INSERT INTO " + table + " (name, key, value)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setString(1, name);
@@ -1850,7 +1903,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
// Return if the timestamp isn't fresh
String sql = "SELECT transportsTimestamp FROM contacts"
String sql = "SELECT received FROM transportTimestamps"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -1868,8 +1921,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.executeUpdate();
ps.close();
// Store the new transports
sql = "INSERT INTO contactTransports"
+ " (contactId, transportName, key, value)"
sql = "INSERT INTO contactTransports (contactId, name, key, value)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -1890,7 +1942,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
ps.close();
// Update the timestamp
sql = "UPDATE contacts SET transportsTimestamp = ?"
sql = "UPDATE transportTimestamps SET received = ?"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
@@ -1904,6 +1956,25 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setTransportTimestamp(Connection txn, ContactId c,
long timestamp) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE transportTimestamps SET sent = ?"
+ " WHERE contactId = ? AND sent < ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, timestamp);
ps.setInt(2, c.getInt());
ps.setLong(3, timestamp);
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setVisibility(Connection txn, GroupId g,
Collection<ContactId> visible) throws DbException {
PreparedStatement ps = null;

View File

@@ -445,7 +445,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
Txn txn = db.startTransaction();
try {
Map<Group, Long> subs = db.getVisibleSubscriptions(txn, c);
s.writeSubscriptionUpdate(subs, System.currentTimeMillis());
s.writeSubscriptions(subs, System.currentTimeMillis());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
@@ -475,8 +475,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
try {
Map<String, Map<String, String>> transports =
db.getTransports(txn);
long timestamp = System.currentTimeMillis();
t.writeTransportUpdate(transports, timestamp);
t.writeTransports(transports, System.currentTimeMillis());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);
@@ -834,7 +833,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate s)
throws DbException {
// Update the contact's subscriptions
contactLock.writeLock().lock();
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
subscriptionLock.writeLock().lock();
@@ -854,14 +853,14 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
contactLock.readLock().unlock();
}
}
public void receiveTransportUpdate(ContactId c, TransportUpdate t)
throws DbException {
// Update the contact's transport properties
contactLock.writeLock().lock();
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
transportLock.writeLock().lock();
@@ -883,7 +882,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
transportLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
contactLock.readLock().unlock();
}
}

View File

@@ -339,7 +339,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
Txn txn = db.startTransaction();
try {
Map<Group, Long> subs = db.getVisibleSubscriptions(txn, c);
s.writeSubscriptionUpdate(subs, System.currentTimeMillis());
s.writeSubscriptions(subs, System.currentTimeMillis());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
@@ -363,8 +363,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
try {
Map<String, Map<String, String>> transports =
db.getTransports(txn);
long timestamp = System.currentTimeMillis();
t.writeTransportUpdate(transports, timestamp);
t.writeTransports(transports, System.currentTimeMillis());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);

View File

@@ -20,7 +20,7 @@ class SubscriptionWriterImpl implements SubscriptionWriter {
w = writerFactory.createWriter(out);
}
public void writeSubscriptionUpdate(Map<Group, Long> subs, long timestamp)
public void writeSubscriptions(Map<Group, Long> subs, long timestamp)
throws IOException {
w.writeUserDefinedTag(Tags.SUBSCRIPTION_UPDATE);
w.writeMap(subs);

View File

@@ -20,9 +20,8 @@ class TransportWriterImpl implements TransportWriter {
w = writerFactory.createWriter(out);
}
public void writeTransportUpdate(
Map<String, Map<String, String>> transports, long timestamp)
throws IOException {
public void writeTransports(Map<String, Map<String, String>> transports,
long timestamp) throws IOException {
w.writeUserDefinedTag(Tags.TRANSPORT_UPDATE);
w.writeListStart();
for(Entry<String, Map<String, String>> e : transports.entrySet()) {

View File

@@ -172,11 +172,11 @@ public class FileReadWriteTest extends TestCase {
Map<Group, Long> subs = new LinkedHashMap<Group, Long>();
subs.put(group, 0L);
subs.put(group1, 0L);
s.writeSubscriptionUpdate(subs, timestamp);
s.writeSubscriptions(subs, timestamp);
packetWriter.finishPacket();
TransportWriter t = protocolWriterFactory.createTransportWriter(out);
t.writeTransportUpdate(transports, timestamp);
t.writeTransports(transports, timestamp);
packetWriter.finishPacket();
out.flush();

View File

@@ -779,7 +779,7 @@ public abstract class DatabaseComponentTest extends TestCase {
oneOf(database).getVisibleSubscriptions(txn, contactId);
will(returnValue(Collections.singletonMap(group, 0L)));
// Add the subscriptions to the writer
oneOf(subscriptionWriter).writeSubscriptionUpdate(
oneOf(subscriptionWriter).writeSubscriptions(
with(Collections.singletonMap(group, 0L)),
with(any(long.class)));
}});
@@ -812,7 +812,7 @@ public abstract class DatabaseComponentTest extends TestCase {
oneOf(database).getTransports(txn);
will(returnValue(transports));
// Add the properties to the writer
oneOf(transportWriter).writeTransportUpdate(with(transports),
oneOf(transportWriter).writeTransports(with(transports),
with(any(long.class)));
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);

View File

@@ -1200,15 +1200,12 @@ public class H2DatabaseTest extends TestCase {
// Add a contact and subscribe to a group
assertEquals(contactId, db.addContact(txn, transports, secret));
db.addSubscription(txn, group);
// The group should not be visible to the contact
assertEquals(Collections.emptyList(), db.getVisibility(txn, groupId));
// Make the group visible to the contact
db.setVisibility(txn, groupId, Collections.singleton(contactId));
assertEquals(Collections.singletonList(contactId),
db.getVisibility(txn, groupId));
// Make the group invisible again
db.setVisibility(txn, groupId, Collections.<ContactId>emptySet());
assertEquals(Collections.emptyList(), db.getVisibility(txn, groupId));
@@ -1225,10 +1222,8 @@ public class H2DatabaseTest extends TestCase {
// Add a contact
assertEquals(contactId, db.addContact(txn, transports, secret));
// Get the connection window for a new transport
ConnectionWindow w = db.getConnectionWindow(txn, contactId, 123);
// The connection window should exist and be in the initial state
assertNotNull(w);
assertEquals(0L, w.getCentre());
@@ -1245,19 +1240,15 @@ public class H2DatabaseTest extends TestCase {
// Add a contact
assertEquals(contactId, db.addContact(txn, transports, secret));
// Get the connection window for a new transport
ConnectionWindow w = db.getConnectionWindow(txn, contactId, 123);
// The connection window should exist and be in the initial state
assertNotNull(w);
assertEquals(0L, w.getCentre());
assertEquals(0, w.getBitmap());
// Update the connection window and store it
w.setSeen(5L);
db.setConnectionWindow(txn, contactId, 123, w);
// Check that the connection window was stored
w = db.getConnectionWindow(txn, contactId, 123);
assertNotNull(w);

View File

@@ -95,10 +95,10 @@ public class ProtocolReadWriteTest extends TestCase {
r.writeRequest(offerId, bitSet, 10);
SubscriptionWriter s = writerFactory.createSubscriptionWriter(out);
s.writeSubscriptionUpdate(subscriptions, timestamp);
s.writeSubscriptions(subscriptions, timestamp);
TransportWriter t = writerFactory.createTransportWriter(out);
t.writeTransportUpdate(transports, timestamp);
t.writeTransports(transports, timestamp);
// Read
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());