Code and API cleanup for the database component.

This commit is contained in:
akwizgran
2013-01-17 16:28:09 +00:00
parent 50ad1f486e
commit 77a5fea5c8
5 changed files with 113 additions and 182 deletions

View File

@@ -256,15 +256,6 @@ interface Database<T> {
*/
Collection<Transport> getLocalTransports(T txn) throws DbException;
/**
* Returns the IDs of any messages sent to the given contact that should
* now be considered lost.
* <p>
* Locking: contact read, message read, messageStatus read.
*/
Collection<MessageId> getLostMessages(T txn, ContactId c)
throws DbException;
/**
* Returns the message identified by the given ID, in serialised form.
* <p>
@@ -307,14 +298,24 @@ interface Database<T> {
throws DbException;
/**
* Returns the IDs of any messages received from the given contact that
* need to be acknowledged.
* Returns the IDs of some messages received from the given contact that
* need to be acknowledged, up to the given number of messages.
* <p>
* Locking: contact read, messageStatus read.
*/
Collection<MessageId> getMessagesToAck(T txn, ContactId c, int maxMessages)
throws DbException;
/**
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given number of messages.
* <p>
* Locking: contact read, message read, messageStatus read,
* subscription read.
*/
Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
int maxMessages) throws DbException;
/**
* Returns the number of children of the message identified by the given
* ID that are present in the database and have sendability scores greater
@@ -324,16 +325,6 @@ interface Database<T> {
*/
int getNumberOfSendableChildren(T txn, MessageId m) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given number of messages.
* <p>
* Locking: contact read, message read, messageStatus read,
* subscription read.
*/
Collection<MessageId> getOfferableMessages(T txn, ContactId c,
int maxMessages) throws DbException;
/**
* Returns the IDs of the oldest messages in the database, with a total
* size less than or equal to the given size.
@@ -354,7 +345,7 @@ interface Database<T> {
* <p>
* Locking: message read, messageFlag read.
*/
boolean getRead(T txn, MessageId m) throws DbException;
boolean getReadFlag(T txn, MessageId m) throws DbException;
/**
* Returns all remote properties for the given transport.
@@ -394,7 +385,7 @@ interface Database<T> {
* <p>
* Locking: message read, messageFlag read.
*/
boolean getStarred(T txn, MessageId m) throws DbException;
boolean getStarredFlag(T txn, MessageId m) throws DbException;
/**
* Returns the groups to which the user subscribes.
@@ -500,8 +491,8 @@ interface Database<T> {
* <p>
* Locking: contact read, message read, messageStatus write.
*/
void removeAckedMessages(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
void removeOutstandingMessages(T txn, ContactId c,
Collection<MessageId> acked) throws DbException;
/**
* Marks the given messages received from the given contact as having been
@@ -520,16 +511,6 @@ interface Database<T> {
*/
void removeContact(T txn, ContactId c) throws DbException;
/**
* Removes outstanding messages that have been lost. Any messages that are
* still considered outstanding (Status.SENT) with respect to the given
* contact are now considered unsent (Status.NEW).
* <p>
* Locking: contact read, message read, messageStatus write.
*/
void removeLostMessages(T txn, ContactId c, Collection<MessageId> lost)
throws DbException;
/**
* Removes a message (and all associated state) from the database.
* <p>

View File

@@ -612,7 +612,7 @@ DatabaseCleaner.Callback {
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
offered = db.getOfferableMessages(txn, c, maxMessages);
offered = db.getMessagesToOffer(txn, c, maxMessages);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1048,12 +1048,7 @@ DatabaseCleaner.Callback {
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
// Mark all acked messages as seen
db.removeAckedMessages(txn, c, a.getMessageIds());
// Find any lost messages that need to be retransmitted
// FIXME: Merge these methods
Collection<MessageId> lost = db.getLostMessages(txn, c);
if(!lost.isEmpty()) db.removeLostMessages(txn, c, lost);
db.removeOutstandingMessages(txn, c, a.getMessageIds());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);

View File

@@ -64,9 +64,9 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String CREATE_MESSAGES =
"CREATE TABLE messages"
+ " (messageId HASH NOT NULL,"
+ " parentId HASH," // Null for the first message in a thread
+ " parentId HASH," // Null for the first msg in a thread
+ " groupId HASH," // Null for private messages
+ " authorId HASH," // Null for private or anonymous messages
+ " authorId HASH," // Null for private or anonymous msgs
+ " subject VARCHAR NOT NULL,"
+ " timestamp BIGINT NOT NULL,"
+ " length INT NOT NULL,"
@@ -76,7 +76,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " sendability INT," // Null for private messages
+ " contactId INT," // Null for group messages
+ " PRIMARY KEY (messageId),"
+ " FOREIGN KEY (groupId) REFERENCES subscriptions (groupId)"
+ " FOREIGN KEY (groupId)"
+ " REFERENCES subscriptions (groupId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
@@ -101,7 +102,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " deleted BIGINT NOT NULL,"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (groupId) REFERENCES subscriptions (groupId)"
+ " FOREIGN KEY (groupId)"
+ " REFERENCES subscriptions (groupId)"
+ " ON DELETE CASCADE)";
private static final String INDEX_VISIBILITIES_BY_GROUP =
@@ -537,16 +539,29 @@ abstract class JdbcDatabase implements Database<Connection> {
public void addMessageToAck(Connection txn, ContactId c, MessageId m)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "INSERT INTO messagesToAck (messageId, contactId)"
String sql = "SELECT NULL FROM messagesToAck"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
rs = ps.executeQuery();
boolean found = rs.next();
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
if(found) return;
sql = "INSERT INTO messagesToAck (messageId, contactId)"
+ " VALUES (?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
if(affected != 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
@@ -1117,12 +1132,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<MessageId> getLostMessages(Connection txn, ContactId c)
throws DbException {
// FIXME: Retransmission
return Collections.emptyList();
}
public byte[] getMessage(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -1319,6 +1328,64 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<MessageId> getMessagesToOffer(Connection txn,
ContactId c, int maxMessages) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Do we have any sendable private messages?
String sql = "SELECT m.messageId FROM messages AS m"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " WHERE m.contactId = ? AND status = ?"
+ " ORDER BY timestamp"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) Status.NEW.ordinal());
ps.setInt(3, maxMessages);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
while(rs.next()) ids.add(new MessageId(rs.getBytes(2)));
rs.close();
ps.close();
if(ids.size() == maxMessages)
return Collections.unmodifiableList(ids);
// Do we have any sendable group messages?
sql = "SELECT m.messageId FROM messages AS m"
+ " JOIN contactSubscriptions AS cs"
+ " ON m.groupId = cs.groupId"
+ " JOIN visibilities AS v"
+ " ON m.groupId = v.groupId"
+ " AND cs.contactId = v.contactId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND cs.contactId = s.contactId"
+ " JOIN subscriptionTimes AS st"
+ " ON cs.contactId = st.contactId"
+ " WHERE cs.contactId = ?"
+ " AND timestamp >= start"
+ " AND timestamp >= expiry"
+ " AND status = ?"
+ " AND sendability > ZERO()"
+ " ORDER BY timestamp"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) Status.NEW.ordinal());
ps.setInt(3, maxMessages - ids.size());
rs = ps.executeQuery();
while(rs.next()) ids.add(new MessageId(rs.getBytes(2)));
rs.close();
ps.close();
return Collections.unmodifiableList(ids);
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public int getNumberOfSendableChildren(Connection txn, MessageId m)
throws DbException {
PreparedStatement ps = null;
@@ -1403,7 +1470,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public boolean getRead(Connection txn, MessageId m) throws DbException {
public boolean getReadFlag(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -1519,64 +1586,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<MessageId> getOfferableMessages(Connection txn,
ContactId c, int maxMessages) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Do we have any sendable private messages?
String sql = "SELECT m.messageId FROM messages AS m"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " WHERE m.contactId = ? AND status = ?"
+ " ORDER BY timestamp"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) Status.NEW.ordinal());
ps.setInt(3, maxMessages);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
while(rs.next()) ids.add(new MessageId(rs.getBytes(2)));
rs.close();
ps.close();
if(ids.size() == maxMessages)
return Collections.unmodifiableList(ids);
// Do we have any sendable group messages?
sql = "SELECT m.messageId FROM messages AS m"
+ " JOIN contactSubscriptions AS cs"
+ " ON m.groupId = cs.groupId"
+ " JOIN visibilities AS v"
+ " ON m.groupId = v.groupId"
+ " AND cs.contactId = v.contactId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND cs.contactId = s.contactId"
+ " JOIN subscriptionTimes AS st"
+ " ON cs.contactId = st.contactId"
+ " WHERE cs.contactId = ?"
+ " AND timestamp >= start"
+ " AND timestamp >= expiry"
+ " AND status = ?"
+ " AND sendability > ZERO()"
+ " ORDER BY timestamp"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) Status.NEW.ordinal());
ps.setInt(3, maxMessages - ids.size());
rs = ps.executeQuery();
while(rs.next()) ids.add(new MessageId(rs.getBytes(2)));
rs.close();
ps.close();
return Collections.unmodifiableList(ids);
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Collection<MessageId> getSendableMessages(Connection txn,
ContactId c, int maxLength) throws DbException {
PreparedStatement ps = null;
@@ -1641,7 +1650,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public boolean getStarred(Connection txn, MessageId m) throws DbException {
public boolean getStarredFlag(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -1975,28 +1984,24 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void removeAckedMessages(Connection txn, ContactId c,
public void removeOutstandingMessages(Connection txn, ContactId c,
Collection<MessageId> acked) throws DbException {
setStatus(txn, c, acked, Status.SEEN);
}
private void setStatus(Connection txn, ContactId c,
Collection<MessageId> ids, Status newStatus) throws DbException {
PreparedStatement ps = null;
try {
// Set the status of each message if it's currently SENT
// Set the status of each message to SEEN if it's currently SENT
String sql = "UPDATE statuses SET status = ?"
+ " WHERE messageId = ? AND contactId = ? AND status = ?";
ps = txn.prepareStatement(sql);
ps.setShort(1, (short) newStatus.ordinal());
ps.setShort(1, (short) Status.SEEN.ordinal());
ps.setInt(3, c.getInt());
ps.setShort(4, (short) Status.SENT.ordinal());
for(MessageId m : ids) {
for(MessageId m : acked) {
ps.setBytes(2, m.getBytes());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if(batchAffected.length != ids.size()) throw new DbStateException();
if(batchAffected.length != acked.size())
throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] > 1) throw new DbStateException();
}
@@ -2048,11 +2053,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void removeLostMessages(Connection txn, ContactId c,
Collection<MessageId> lost) throws DbException {
setStatus(txn, c, lost, Status.NEW);
}
public void removeMessage(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null;
try {

View File

@@ -755,7 +755,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
allowing(database).containsContact(txn, contactId);
will(returnValue(true));
// Get the sendable message IDs
oneOf(database).getOfferableMessages(txn, contactId, 123);
oneOf(database).getMessagesToOffer(txn, contactId, 123);
will(returnValue(offerable));
// Create the packet
oneOf(packetFactory).createOffer(offerable);
@@ -898,11 +898,8 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
// Get the acked messages
oneOf(ack).getMessageIds();
will(returnValue(Collections.singletonList(messageId)));
oneOf(database).removeAckedMessages(txn, contactId,
oneOf(database).removeOutstandingMessages(txn, contactId,
Collections.singletonList(messageId));
// Find lost messages
oneOf(database).getLostMessages(txn, contactId);
will(returnValue(Collections.emptyList()));
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner,
shutdown, packetFactory);

View File

@@ -556,7 +556,7 @@ public class H2DatabaseTest extends BriarTestCase {
}
@Test
public void testRemoveAckedMessage() throws Exception {
public void testOutstandingMessageAcked() throws Exception {
Database<Connection> db = open(false);
Connection txn = db.startTransaction();
@@ -584,7 +584,7 @@ public class H2DatabaseTest extends BriarTestCase {
assertFalse(it.hasNext());
// Pretend that the message was acked
db.removeAckedMessages(txn, contactId,
db.removeOutstandingMessages(txn, contactId,
Collections.singletonList(messageId));
// The message still should not be sendable
@@ -595,48 +595,6 @@ public class H2DatabaseTest extends BriarTestCase {
db.close();
}
@Test
public void testRemoveLostMessage() throws Exception {
Database<Connection> db = open(false);
Connection txn = db.startTransaction();
// Add a contact, subscribe to a group and store a message
assertEquals(contactId, db.addContact(txn));
db.addSubscription(txn, group);
db.addVisibility(txn, contactId, groupId);
db.addSubscription(txn, contactId, group, 0L);
db.addGroupMessage(txn, message);
db.setSendability(txn, messageId, 1);
db.setStatus(txn, contactId, messageId, Status.NEW);
// Get the message and mark it as sent
Iterator<MessageId> it =
db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator();
assertTrue(it.hasNext());
assertEquals(messageId, it.next());
assertFalse(it.hasNext());
db.setStatus(txn, contactId, messageId, Status.SENT);
db.addOutstandingMessages(txn, contactId,
Collections.singletonList(messageId));
// The message should no longer be sendable
it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator();
assertFalse(it.hasNext());
// Pretend that the message was lost
db.removeLostMessages(txn, contactId,
Collections.singletonList(messageId));
// The message should be sendable again
it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator();
assertTrue(it.hasNext());
assertEquals(messageId, it.next());
assertFalse(it.hasNext());
db.commitTransaction(txn);
db.close();
}
@Test
public void testGetMessagesByAuthor() throws Exception {
AuthorId authorId1 = new AuthorId(TestUtils.getRandomId());
@@ -1467,12 +1425,12 @@ public class H2DatabaseTest extends BriarTestCase {
db.addGroupMessage(txn, message);
// The message should be unread by default
assertFalse(db.getRead(txn, messageId));
assertFalse(db.getReadFlag(txn, messageId));
// Marking the message read should return the old value
assertFalse(db.setRead(txn, messageId, true));
assertTrue(db.setRead(txn, messageId, true));
// The message should be read
assertTrue(db.getRead(txn, messageId));
assertTrue(db.getReadFlag(txn, messageId));
// Marking the message unread should return the old value
assertTrue(db.setRead(txn, messageId, false));
assertFalse(db.setRead(txn, messageId, false));
@@ -1493,12 +1451,12 @@ public class H2DatabaseTest extends BriarTestCase {
db.addGroupMessage(txn, message);
// The message should be unstarred by default
assertFalse(db.getStarred(txn, messageId));
assertFalse(db.getStarredFlag(txn, messageId));
// Starring the message should return the old value
assertFalse(db.setStarred(txn, messageId, true));
assertTrue(db.setStarred(txn, messageId, true));
// The message should be starred
assertTrue(db.getStarred(txn, messageId));
assertTrue(db.getStarredFlag(txn, messageId));
// Unstarring the message should return the old value
assertTrue(db.setStarred(txn, messageId, false));
assertFalse(db.setStarred(txn, messageId, false));