Add denormalised columns to statuses table.

This commit is contained in:
akwizgran
2018-02-02 16:44:35 +00:00
parent aa07d0cadd
commit 7fe502e3cc
6 changed files with 271 additions and 223 deletions

View File

@@ -96,9 +96,12 @@ interface Database<T> {
/**
* Stores a message.
*
* @param sender the contact from whom the message was received, or null
* if the message was created locally.
*/
void addMessage(T txn, Message m, State state, boolean shared)
throws DbException;
void addMessage(T txn, Message m, State state, boolean shared,
@Nullable ContactId sender) throws DbException;
/**
* Adds a dependency between two messages in the given group.
@@ -111,16 +114,6 @@ interface Database<T> {
*/
void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException;
/**
* Initialises the status of the given message with respect to the given
* contact.
*
* @param ack whether the message needs to be acknowledged.
* @param seen whether the contact has seen the message.
*/
void addStatus(T txn, ContactId c, MessageId m, boolean ack, boolean seen)
throws DbException;
/**
* Stores a transport.
*/
@@ -279,7 +272,7 @@ interface Database<T> {
* <p/>
* Read-only.
*/
Collection<ContactId> getGroupVisibility(T txn, GroupId g)
Map<ContactId, Boolean> getGroupVisibility(T txn, GroupId g)
throws DbException;
/**
@@ -573,13 +566,6 @@ interface Database<T> {
*/
void removeMessage(T txn, MessageId m) throws DbException;
/**
* Removes an offered message that was offered by the given contact, or
* returns false if there is no such message.
*/
boolean removeOfferedMessage(T txn, ContactId c, MessageId m)
throws DbException;
/**
* Removes the given offered messages that were offered by the given
* contact.
@@ -587,12 +573,6 @@ interface Database<T> {
void removeOfferedMessages(T txn, ContactId c,
Collection<MessageId> requested) throws DbException;
/**
* Removes the status of the given message with respect to the given
* contact.
*/
void removeStatus(T txn, ContactId c, MessageId m) throws DbException;
/**
* Removes a transport (and all associated state) from the database.
*/

View File

@@ -213,7 +213,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsGroup(txn, m.getGroupId()))
throw new NoSuchGroupException();
if (!db.containsMessage(txn, m.getId())) {
addMessage(txn, m, DELIVERED, shared, null);
db.addMessage(txn, m, DELIVERED, shared, null);
transaction.attach(new MessageAddedEvent(m, null));
transaction.attach(new MessageStateChangedEvent(m.getId(), true,
DELIVERED));
@@ -222,16 +222,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.mergeMessageMetadata(txn, m.getId(), meta);
}
private void addMessage(T txn, Message m, State state, boolean shared,
@Nullable ContactId sender) throws DbException {
db.addMessage(txn, m, state, shared);
for (ContactId c : db.getGroupVisibility(txn, m.getGroupId())) {
boolean offered = db.removeOfferedMessage(txn, c, m.getId());
boolean seen = offered || (sender != null && c.equals(sender));
db.addStatus(txn, c, m.getId(), seen, seen);
}
}
@Override
public void addTransport(Transaction transaction, TransportId t,
int maxLatency) throws DbException {
@@ -673,7 +663,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.raiseSeenFlag(txn, c, m.getId());
db.raiseAckFlag(txn, c, m.getId());
} else {
addMessage(txn, m, UNKNOWN, false, c);
db.addMessage(txn, m, UNKNOWN, false, c);
transaction.attach(new MessageAddedEvent(m, c));
}
transaction.attach(new MessageToAckEvent(c));
@@ -741,7 +731,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
GroupId id = g.getId();
if (!db.containsGroup(txn, id))
throw new NoSuchGroupException();
Collection<ContactId> affected = db.getGroupVisibility(txn, id);
Collection<ContactId> affected =
db.getGroupVisibility(txn, id).keySet();
db.removeGroup(txn, id);
transaction.attach(new GroupRemovedEvent(g));
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
@@ -811,19 +802,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchGroupException();
Visibility old = db.getGroupVisibility(txn, c, g);
if (old == v) return;
if (old == INVISIBLE) {
db.addGroupVisibility(txn, c, g, v == SHARED);
for (MessageId m : db.getMessageIds(txn, g)) {
boolean seen = db.removeOfferedMessage(txn, c, m);
db.addStatus(txn, c, m, seen, seen);
}
} else if (v == INVISIBLE) {
db.removeGroupVisibility(txn, c, g);
for (MessageId m : db.getMessageIds(txn, g))
db.removeStatus(txn, c, m);
} else {
db.setGroupVisibility(txn, c, g, v == SHARED);
}
if (old == INVISIBLE) db.addGroupVisibility(txn, c, g, v == SHARED);
else if (v == INVISIBLE) db.removeGroupVisibility(txn, c, g);
else db.setGroupVisibility(txn, c, g, v == SHARED);
List<ContactId> affected = Collections.singletonList(c);
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
}

View File

@@ -71,7 +71,7 @@ import static org.briarproject.bramble.db.ExponentialBackoff.calculateExpiry;
abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing
static final int CODE_SCHEMA_VERSION = 34;
static final int CODE_SCHEMA_VERSION = 35;
private static final String CREATE_SETTINGS =
"CREATE TABLE settings"
@@ -189,6 +189,13 @@ abstract class JdbcDatabase implements Database<Connection> {
"CREATE TABLE statuses"
+ " (messageId _HASH NOT NULL,"
+ " contactId INT NOT NULL,"
+ " groupId _HASH NOT NULL," // Denormalised
+ " timestamp BIGINT NOT NULL," // Denormalised
+ " length INT NOT NULL," // Denormalised
+ " state INT NOT NULL," // Denormalised
+ " groupShared BOOLEAN NOT NULL," // Denormalised
+ " messageShared BOOLEAN NOT NULL," // Denormalised
+ " deleted BOOLEAN NOT NULL," // Denormalised
+ " ack BOOLEAN NOT NULL,"
+ " seen BOOLEAN NOT NULL,"
+ " requested BOOLEAN NOT NULL,"
@@ -200,6 +207,9 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (groupId)"
+ " REFERENCES groups (groupId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_TRANSPORTS =
@@ -253,6 +263,14 @@ abstract class JdbcDatabase implements Database<Connection> {
"CREATE INDEX IF NOT EXISTS messageMetadataByGroupIdState"
+ " ON messageMetadata (groupId, state)";
private static final String INDEX_STATUSES_BY_CONTACT_ID_GROUP_ID =
"CREATE INDEX IF NOT EXISTS statusesByContactIdGroupId"
+ " ON statuses (contactId, groupId)";
private static final String INDEX_STATUSES_BY_CONTACT_ID_TIMESTAMP =
"CREATE INDEX IF NOT EXISTS statusesByContactIdTimestamp"
+ " ON statuses (contactId, timestamp)";
private static final Logger LOG =
Logger.getLogger(JdbcDatabase.class.getName());
@@ -401,6 +419,8 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(INDEX_CONTACTS_BY_AUTHOR_ID);
s.executeUpdate(INDEX_GROUPS_BY_CLIENT_ID);
s.executeUpdate(INDEX_MESSAGE_METADATA_BY_GROUP_ID_STATE);
s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_GROUP_ID);
s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TIMESTAMP);
s.close();
} catch (SQLException e) {
tryToClose(s);
@@ -581,7 +601,7 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override
public void addGroupVisibility(Connection txn, ContactId c, GroupId g,
boolean shared) throws DbException {
boolean groupShared) throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO groupVisibilities"
@@ -590,16 +610,50 @@ abstract class JdbcDatabase implements Database<Connection> {
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, g.getBytes());
ps.setBoolean(3, shared);
ps.setBoolean(3, groupShared);
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
// Create a status row for each message in the group
addStatus(txn, c, g, groupShared);
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
private void addStatus(Connection txn, ContactId c, GroupId g,
boolean groupShared) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT messageId, timestamp, state, shared,"
+ " length, raw IS NULL"
+ " FROM messages"
+ " WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
rs = ps.executeQuery();
while (rs.next()) {
MessageId id = new MessageId(rs.getBytes(1));
long timestamp = rs.getLong(2);
State state = State.fromValue(rs.getInt(3));
boolean messageShared = rs.getBoolean(4);
int length = rs.getInt(5);
boolean deleted = rs.getBoolean(6);
boolean seen = removeOfferedMessage(txn, c, id);
addStatus(txn, id, c, g, timestamp, length, state, groupShared,
messageShared, deleted, seen);
}
rs.close();
ps.close();
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
@Override
public void addLocalAuthor(Connection txn, LocalAuthor a)
throws DbException {
@@ -627,7 +681,8 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override
public void addMessage(Connection txn, Message m, State state,
boolean shared) throws DbException {
boolean messageShared, @Nullable ContactId sender)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO messages (messageId, groupId, timestamp,"
@@ -638,13 +693,24 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.setBytes(2, m.getGroupId().getBytes());
ps.setLong(3, m.getTimestamp());
ps.setInt(4, state.getValue());
ps.setBoolean(5, shared);
ps.setBoolean(5, messageShared);
byte[] raw = m.getRaw();
ps.setInt(6, raw.length);
ps.setBytes(7, raw);
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
// Create a status row for each contact that can see the group
Map<ContactId, Boolean> visibility =
getGroupVisibility(txn, m.getGroupId());
for (Entry<ContactId, Boolean> e : visibility.entrySet()) {
ContactId c = e.getKey();
boolean offered = removeOfferedMessage(txn, c, m.getId());
boolean seen = offered || (sender != null && c.equals(sender));
addStatus(txn, m.getId(), c, m.getGroupId(), m.getTimestamp(),
m.getLength(), state, e.getValue(), messageShared,
false, seen);
}
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
@@ -682,19 +748,28 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public void addStatus(Connection txn, ContactId c, MessageId m, boolean ack,
boolean seen) throws DbException {
private void addStatus(Connection txn, MessageId m, ContactId c, GroupId g,
long timestamp, int length, State state, boolean groupShared,
boolean messageShared, boolean deleted, boolean seen)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO statuses (messageId, contactId, ack,"
+ " seen, requested, expiry, txCount)"
+ " VALUES (?, ?, ?, ?, FALSE, 0, 0)";
String sql = "INSERT INTO statuses (messageId, contactId, groupId,"
+ " timestamp, length, state, groupShared, messageShared,"
+ " deleted, ack, seen, requested, expiry, txCount)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, FALSE, 0, 0)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
ps.setBoolean(3, ack);
ps.setBoolean(4, seen);
ps.setBytes(3, g.getBytes());
ps.setLong(4, timestamp);
ps.setInt(5, length);
ps.setInt(6, state.getValue());
ps.setBoolean(7, groupShared);
ps.setBoolean(8, messageShared);
ps.setBoolean(9, deleted);
ps.setBoolean(10, seen);
ps.setBoolean(11, seen);
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
@@ -946,12 +1021,9 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT NULL FROM messages AS m"
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " WHERE messageId = ?"
+ " AND contactId = ?"
+ " AND m.shared = TRUE";
String sql = "SELECT NULL FROM statuses"
+ " WHERE messageId = ? AND contactId = ?"
+ " AND messageShared = TRUE";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
@@ -1003,6 +1075,13 @@ abstract class JdbcDatabase implements Database<Connection> {
if (affected < 0) throw new DbStateException();
if (affected > 1) throw new DbStateException();
ps.close();
// Update denormalised column in statuses
sql = "UPDATE statuses SET deleted = TRUE WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
affected = ps.executeUpdate();
if (affected < 0) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
@@ -1231,18 +1310,19 @@ abstract class JdbcDatabase implements Database<Connection> {
}
@Override
public Collection<ContactId> getGroupVisibility(Connection txn, GroupId g)
public Map<ContactId, Boolean> getGroupVisibility(Connection txn, GroupId g)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT contactId FROM groupVisibilities"
String sql = "SELECT contactId, shared FROM groupVisibilities"
+ " WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
rs = ps.executeQuery();
List<ContactId> visible = new ArrayList<>();
while (rs.next()) visible.add(new ContactId(rs.getInt(1)));
Map<ContactId, Boolean> visible = new HashMap<>();
while (rs.next())
visible.put(new ContactId(rs.getInt(1)), rs.getBoolean(2));
rs.close();
ps.close();
return visible;
@@ -1524,12 +1604,8 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT m.messageId, txCount > 0, seen"
+ " FROM messages AS m"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " WHERE groupId = ?"
+ " AND contactId = ?";
String sql = "SELECT messageId, txCount > 0, seen FROM statuses"
+ " WHERE groupId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getBytes());
ps.setInt(2, c.getInt());
@@ -1552,15 +1628,13 @@ abstract class JdbcDatabase implements Database<Connection> {
}
@Override
public MessageStatus getMessageStatus(Connection txn,
ContactId c, MessageId m) throws DbException {
public MessageStatus getMessageStatus(Connection txn, ContactId c,
MessageId m) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT txCount > 0, seen"
+ " FROM statuses"
+ " WHERE messageId = ?"
+ " AND contactId = ?";
String sql = "SELECT txCount > 0, seen FROM statuses"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
@@ -1702,14 +1776,10 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT m.messageId FROM messages AS m"
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND gv.contactId = s.contactId"
+ " WHERE gv.contactId = ? AND gv.shared = TRUE"
+ " AND state = ? AND m.shared = TRUE AND raw IS NOT NULL"
String sql = "SELECT messageId FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = FALSE"
+ " AND expiry < ?"
+ " ORDER BY timestamp LIMIT ?";
@@ -1763,14 +1833,10 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT length, m.messageId FROM messages AS m"
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND gv.contactId = s.contactId"
+ " WHERE gv.contactId = ? AND gv.shared = TRUE"
+ " AND state = ? AND m.shared = TRUE AND raw IS NOT NULL"
String sql = "SELECT length, messageId FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE"
+ " AND expiry < ?"
+ " ORDER BY timestamp";
@@ -1834,8 +1900,8 @@ abstract class JdbcDatabase implements Database<Connection> {
}
@Override
public Collection<MessageId> getMessagesToShare(
Connection txn, ClientId c) throws DbException {
public Collection<MessageId> getMessagesToShare(Connection txn, ClientId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -1894,14 +1960,10 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT length, m.messageId FROM messages AS m"
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND gv.contactId = s.contactId"
+ " WHERE gv.contactId = ? AND gv.shared = TRUE"
+ " AND state = ? AND m.shared = TRUE AND raw IS NOT NULL"
String sql = "SELECT length, messageId FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = TRUE"
+ " AND expiry < ?"
+ " ORDER BY timestamp";
@@ -2380,6 +2442,8 @@ abstract class JdbcDatabase implements Database<Connection> {
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
// Remove status rows for the messages in the group
for (MessageId m : getMessageIds(txn, g)) removeStatus(txn, c, m);
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
@@ -2419,8 +2483,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public boolean removeOfferedMessage(Connection txn, ContactId c,
private boolean removeOfferedMessage(Connection txn, ContactId c,
MessageId m) throws DbException {
PreparedStatement ps = null;
try {
@@ -2464,16 +2527,15 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public void removeStatus(Connection txn, ContactId c, MessageId m)
private void removeStatus(Connection txn, ContactId c, MessageId m)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "DELETE FROM statuses"
+ " WHERE contactId = ? AND messageId = ?";
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, m.getBytes());
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
@@ -2569,6 +2631,16 @@ abstract class JdbcDatabase implements Database<Connection> {
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
// Update denormalised column in statuses
sql = "UPDATE statuses SET groupShared = ?"
+ " WHERE contactId = ? AND groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBoolean(1, shared);
ps.setInt(2, c.getInt());
ps.setBytes(3, g.getBytes());
affected = ps.executeUpdate();
if (affected < 0) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
@@ -2587,6 +2659,14 @@ abstract class JdbcDatabase implements Database<Connection> {
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
// Update denormalised column in statuses
sql = "UPDATE statuses SET messageShared = TRUE"
+ " WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
affected = ps.executeUpdate();
if (affected < 0) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
@@ -2613,6 +2693,14 @@ abstract class JdbcDatabase implements Database<Connection> {
affected = ps.executeUpdate();
if (affected < 0) throw new DbStateException();
ps.close();
// Update denormalised column in statuses
sql = "UPDATE statuses SET state = ? WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, state.getValue());
ps.setBytes(2, m.getBytes());
affected = ps.executeUpdate();
if (affected < 0) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);