More efficient database joins.

Don't keep message status rows for groups that are invisible to the contact - this avoids the need to join the groupVisibilities table when selecting messages to offer or send. Add or remove status rows when group visibility changes.
This commit is contained in:
akwizgran
2016-02-12 11:44:07 +00:00
parent e979f17e45
commit 6b76b75d08
5 changed files with 93 additions and 151 deletions

View File

@@ -188,11 +188,6 @@ interface Database<T> {
*/
Contact getContact(T txn, ContactId c) throws DbException;
/**
* Returns the IDs of all contacts.
*/
Collection<ContactId> getContactIds(T txn) throws DbException;
/**
* Returns all contacts.
*/
@@ -240,6 +235,11 @@ interface Database<T> {
*/
Collection<LocalAuthor> getLocalAuthors(T txn) throws DbException;
/**
* Returns the IDs of all messages in the given group.
*/
Collection<MessageId> getMessageIds(T txn, GroupId g) throws DbException;
/**
* Returns the metadata for all messages in the given group.
*/
@@ -424,6 +424,12 @@ interface Database<T> {
void removeOfferedMessages(T txn, ContactId c,
Collection<MessageId> requested) throws DbException;
/**
* Removes the status of the given message with respect to the given
* contact.
*/
void removeStatus(T txn, ContactId c, MessageId m) throws DbException;
/**
* Removes a transport (and all associated state) from the database.
*/

View File

@@ -56,7 +56,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -70,12 +69,6 @@ import static org.briarproject.api.sync.ValidationManager.Validity.UNKNOWN;
import static org.briarproject.api.sync.ValidationManager.Validity.VALID;
import static org.briarproject.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
/**
* An implementation of DatabaseComponent using reentrant read-write locks.
* Depending on the JVM's lock implementation, this implementation may allow
* writers to starve. LockFairnessTest can be used to test whether this
* implementation is safe on a given JVM.
*/
class DatabaseComponentImpl<T> implements DatabaseComponent {
private static final Logger LOG =
@@ -177,7 +170,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsGroup(txn, m.getGroupId()))
throw new NoSuchGroupException();
if (!db.containsMessage(txn, m.getId())) {
addMessage(txn, m, VALID, shared, null);
addMessage(txn, m, VALID, shared);
transaction.attach(new MessageAddedEvent(m, null));
transaction.attach(new MessageValidatedEvent(m, c, true, true));
if (shared) transaction.attach(new MessageSharedEvent(m));
@@ -185,26 +178,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.mergeMessageMetadata(txn, m.getId(), meta);
}
/**
* Stores a message and initialises its status with respect to each contact.
*
* @param sender null for a locally generated message.
*/
private void addMessage(T txn, Message m, Validity validity, boolean shared,
ContactId sender) throws DbException {
private void addMessage(T txn, Message m, Validity validity, boolean shared)
throws DbException {
db.addMessage(txn, m, validity, shared);
GroupId g = m.getGroupId();
Collection<ContactId> visibility = db.getVisibility(txn, g);
visibility = new HashSet<ContactId>(visibility);
for (ContactId c : db.getContactIds(txn)) {
if (visibility.contains(c)) {
boolean offered = db.removeOfferedMessage(txn, c, m.getId());
boolean seen = offered || c.equals(sender);
db.addStatus(txn, c, m.getId(), offered, seen);
} else {
if (c.equals(sender)) throw new IllegalStateException();
db.addStatus(txn, c, m.getId(), false, false);
}
for (ContactId c : db.getVisibility(txn, m.getGroupId())) {
boolean offered = db.removeOfferedMessage(txn, c, m.getId());
db.addStatus(txn, c, m.getId(), offered, offered);
}
}
@@ -516,7 +495,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchContactException();
if (db.containsVisibleGroup(txn, c, m.getGroupId())) {
if (!db.containsMessage(txn, m.getId())) {
addMessage(txn, m, UNKNOWN, false, c);
addMessage(txn, m, UNKNOWN, false);
transaction.attach(new MessageAddedEvent(m, c));
}
db.raiseAckFlag(txn, c, m.getId());
@@ -529,8 +508,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
int count = db.countOfferedMessages(txn, c);
boolean ack = false, request = false;
int count = db.countOfferedMessages(txn, c);
for (MessageId m : o.getMessageIds()) {
if (db.containsVisibleMessage(txn, c, m)) {
db.raiseSeenFlag(txn, c, m);
@@ -638,8 +617,17 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsGroup(txn, g))
throw new NoSuchGroupException();
boolean wasVisible = db.containsVisibleGroup(txn, c, g);
if (visible && !wasVisible) db.addVisibility(txn, c, g);
else if (!visible && wasVisible) db.removeVisibility(txn, c, g);
if (visible && !wasVisible) {
db.addVisibility(txn, c, g);
for (MessageId m : db.getMessageIds(txn, g)) {
boolean seen = db.removeOfferedMessage(txn, c, m);
db.addStatus(txn, c, m, seen, seen);
}
} else if (!visible && wasVisible) {
db.removeVisibility(txn, c, g);
for (MessageId m : db.getMessageIds(txn, g))
db.removeStatus(txn, c, m);
}
if (visible != wasVisible) {
List<ContactId> affected = Collections.singletonList(c);
transaction.attach(new GroupVisibilityUpdatedEvent(affected));

View File

@@ -63,15 +63,15 @@ import static org.briarproject.db.ExponentialBackoff.calculateExpiry;
*/
abstract class JdbcDatabase implements Database<Connection> {
private static final int SCHEMA_VERSION = 21;
private static final int MIN_SCHEMA_VERSION = 21;
private static final int SCHEMA_VERSION = 22;
private static final int MIN_SCHEMA_VERSION = 22;
private static final String CREATE_SETTINGS =
"CREATE TABLE settings"
+ " (key VARCHAR NOT NULL,"
+ " (namespace VARCHAR NOT NULL,"
+ " key VARCHAR NOT NULL,"
+ " value VARCHAR NOT NULL,"
+ " namespace VARCHAR NOT NULL,"
+ " PRIMARY KEY (key, namespace))";
+ " PRIMARY KEY (namespace, key))";
private static final String CREATE_LOCAL_AUTHORS =
"CREATE TABLE localAuthors"
@@ -468,31 +468,6 @@ abstract class JdbcDatabase implements Database<Connection> {
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
// Create a status row for each message
sql = "SELECT messageID FROM messages";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
Collection<byte[]> ids = new ArrayList<byte[]>();
while (rs.next()) ids.add(rs.getBytes(1));
rs.close();
ps.close();
if (!ids.isEmpty()) {
sql = "INSERT INTO statuses (messageId, contactId, ack,"
+ " seen, requested, expiry, txCount)"
+ " VALUES (?, ?, FALSE, FALSE, FALSE, 0, 0)";
ps = txn.prepareStatement(sql);
ps.setInt(2, c.getInt());
for (byte[] id : ids) {
ps.setBytes(1, id);
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != ids.size())
throw new DbStateException();
for (int rows : batchAffected)
if (rows != 1) throw new DbStateException();
ps.close();
}
return c;
} catch (SQLException e) {
tryToClose(rs);
@@ -973,26 +948,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<ContactId> getContactIds(Connection txn)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT contactId FROM contacts";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
List<ContactId> ids = new ArrayList<ContactId>();
while (rs.next()) ids.add(new ContactId(rs.getInt(1)));
rs.close();
ps.close();
return Collections.unmodifiableList(ids);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Collection<Contact> getContacts(Connection txn)
throws DbException {
PreparedStatement ps = null;
@@ -1151,6 +1106,27 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<MessageId> getMessageIds(Connection txn, GroupId g)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT messageId FROM messages WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
rs.close();
ps.close();
return Collections.unmodifiableList(ids);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Map<MessageId, Metadata> getMessageMetadata(Connection txn,
GroupId g) throws DbException {
PreparedStatement ps = null;
@@ -1309,15 +1285,12 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
String sql = "SELECT m.messageId FROM messages AS m"
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND gv.contactId = s.contactId"
+ " WHERE gv.contactId = ?"
+ " WHERE contactId = ?"
+ " AND valid = ? AND shared = TRUE AND raw IS NOT NULL"
+ " AND seen = FALSE AND requested = FALSE"
+ " AND s.expiry < ?"
+ " AND expiry < ?"
+ " ORDER BY timestamp DESC LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -1368,15 +1341,12 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
String sql = "SELECT length, m.messageId FROM messages AS m"
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND gv.contactId = s.contactId"
+ " WHERE gv.contactId = ?"
+ " WHERE contactId = ?"
+ " AND valid = ? AND shared = TRUE AND raw IS NOT NULL"
+ " AND seen = FALSE"
+ " AND s.expiry < ?"
+ " AND expiry < ?"
+ " ORDER BY timestamp DESC";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -1454,15 +1424,12 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
String sql = "SELECT length, m.messageId FROM messages AS m"
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND gv.contactId = s.contactId"
+ " WHERE gv.contactId = ?"
+ " WHERE contactId = ?"
+ " AND valid = ? AND shared = TRUE AND raw IS NOT NULL"
+ " AND seen = FALSE AND requested = TRUE"
+ " AND s.expiry < ?"
+ " AND expiry < ?"
+ " ORDER BY timestamp DESC";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -1777,12 +1744,12 @@ abstract class JdbcDatabase implements Database<Connection> {
try {
// Update any settings that already exist
String sql = "UPDATE settings SET value = ?"
+ " WHERE key = ? AND namespace = ?";
+ " WHERE namespace = ? AND key = ?";
ps = txn.prepareStatement(sql);
for (Entry<String, String> e : s.entrySet()) {
ps.setString(1, e.getValue());
ps.setString(2, e.getKey());
ps.setString(3, namespace);
ps.setString(2, namespace);
ps.setString(3, e.getKey());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
@@ -1792,15 +1759,15 @@ abstract class JdbcDatabase implements Database<Connection> {
if (rows > 1) throw new DbStateException();
}
// Insert any settings that don't already exist
sql = "INSERT INTO settings (key, value, namespace)"
sql = "INSERT INTO settings (namespace, key, value)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
int updateIndex = 0, inserted = 0;
for (Entry<String, String> e : s.entrySet()) {
if (batchAffected[updateIndex] == 0) {
ps.setString(1, e.getKey());
ps.setString(2, e.getValue());
ps.setString(3, namespace);
ps.setString(1, namespace);
ps.setString(2, e.getKey());
ps.setString(3, e.getValue());
ps.addBatch();
inserted++;
}
@@ -1976,6 +1943,24 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void removeStatus(Connection txn, ContactId c, MessageId m)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "DELETE FROM statuses"
+ " WHERE contactId = ? AND messageId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, m.getBytes());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void removeTransport(Connection txn, TransportId t)
throws DbException {
PreparedStatement ps = null;

View File

@@ -261,8 +261,6 @@ public class DatabaseComponentImplTest extends BriarTestCase {
oneOf(database).mergeMessageMetadata(txn, messageId, metadata);
oneOf(database).getVisibility(txn, groupId);
will(returnValue(Collections.singletonList(contactId)));
oneOf(database).getContactIds(txn);
will(returnValue(Collections.singletonList(contactId)));
oneOf(database).removeOfferedMessage(txn, contactId, messageId);
will(returnValue(false));
oneOf(database).addStatus(txn, contactId, messageId, false, false);
@@ -1074,11 +1072,9 @@ public class DatabaseComponentImplTest extends BriarTestCase {
oneOf(database).addMessage(txn, message, UNKNOWN, false);
oneOf(database).getVisibility(txn, groupId);
will(returnValue(Collections.singletonList(contactId)));
oneOf(database).getContactIds(txn);
will(returnValue(Collections.singletonList(contactId)));
oneOf(database).removeOfferedMessage(txn, contactId, messageId);
will(returnValue(false));
oneOf(database).addStatus(txn, contactId, messageId, false, true);
oneOf(database).addStatus(txn, contactId, messageId, false, false);
oneOf(database).raiseAckFlag(txn, contactId, messageId);
oneOf(database).commitTransaction(txn);
// The message was received and added
@@ -1270,6 +1266,11 @@ public class DatabaseComponentImplTest extends BriarTestCase {
oneOf(database).containsVisibleGroup(txn, contactId, groupId);
will(returnValue(false)); // Not yet visible
oneOf(database).addVisibility(txn, contactId, groupId);
oneOf(database).getMessageIds(txn, groupId);
will(returnValue(Collections.singletonList(messageId)));
oneOf(database).removeOfferedMessage(txn, contactId, messageId);
will(returnValue(false));
oneOf(database).addStatus(txn, contactId, messageId, false, false);
oneOf(database).commitTransaction(txn);
oneOf(eventBus).broadcast(with(any(
GroupVisibilityUpdatedEvent.class)));

View File

@@ -302,44 +302,6 @@ public class H2DatabaseTest extends BriarTestCase {
db.close();
}
@Test
public void testSendableMessagesMustBeVisible() throws Exception {
Database<Connection> db = open(false);
Connection txn = db.startTransaction();
// Add a contact, a group and a message
db.addLocalAuthor(txn, localAuthor);
assertEquals(contactId, db.addContact(txn, author, localAuthorId));
db.addGroup(txn, group);
db.addMessage(txn, message, VALID, true);
db.addStatus(txn, contactId, messageId, false, false);
// The group is not visible to the contact, so the message
// should not be sendable
Collection<MessageId> ids = db.getMessagesToSend(txn, contactId,
ONE_MEGABYTE);
assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100);
assertTrue(ids.isEmpty());
// Making the group visible should make the message sendable
db.addVisibility(txn, contactId, groupId);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE);
assertEquals(Collections.singletonList(messageId), ids);
ids = db.getMessagesToOffer(txn, contactId, 100);
assertEquals(Collections.singletonList(messageId), ids);
// Making the group invisible should make the message unsendable
db.removeVisibility(txn, contactId, groupId);
ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE);
assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100);
assertTrue(ids.isEmpty());
db.commitTransaction(txn);
db.close();
}
@Test
public void testMessagesToAck() throws Exception {
Database<Connection> db = open(false);