Replaced the Status enum with a seen flag and an expiry time.

This commit is contained in:
akwizgran
2013-02-04 17:48:30 +00:00
parent 30a0269652
commit 5150737476
8 changed files with 135 additions and 183 deletions

View File

@@ -104,12 +104,13 @@ interface Database<T> {
void addMessageToAck(T txn, ContactId c, MessageId m) throws DbException;
/**
* Records a collection of sent messages as needing to be acknowledged.
* Records the given messages as needing to be acknowledged by the given
* expiry time.
* <p>
* Locking: contact read, message write.
*/
void addOutstandingMessages(T txn, ContactId c, Collection<MessageId> sent)
throws DbException;
void addOutstandingMessages(T txn, ContactId c, Collection<MessageId> sent,
long expiry) throws DbException;
/**
* Stores the given message, or returns false if the message is already in
@@ -128,6 +129,15 @@ interface Database<T> {
void addSecrets(T txn, Collection<TemporarySecret> secrets)
throws DbException;
/**
* Initialises the status (seen or unseen) of the given message with
* respect to the given contact.
* <p>
* Locking: contact read, message write.
*/
void addStatus(T txn, ContactId c, MessageId m, boolean seen)
throws DbException;
/**
* Subscribes to the given group.
* <p>
@@ -614,14 +624,6 @@ interface Database<T> {
boolean setStarredFlag(T txn, MessageId m, boolean starred)
throws DbException;
/**
* Sets the status of the given message with respect to the given contact.
* <p>
* Locking: contact read, message write.
*/
void setStatus(T txn, ContactId c, MessageId m, Status s)
throws DbException;
/**
* If the database contains the given message and it belongs to a group
* that is visible to the given contact, marks the message as seen by the

View File

@@ -7,8 +7,6 @@ import static net.sf.briar.db.DatabaseConstants.CRITICAL_FREE_SPACE;
import static net.sf.briar.db.DatabaseConstants.MAX_BYTES_BETWEEN_SPACE_CHECKS;
import static net.sf.briar.db.DatabaseConstants.MAX_MS_BETWEEN_SPACE_CHECKS;
import static net.sf.briar.db.DatabaseConstants.MIN_FREE_SPACE;
import static net.sf.briar.db.Status.NEW;
import static net.sf.briar.db.Status.SEEN;
import java.io.IOException;
import java.util.ArrayList;
@@ -277,11 +275,11 @@ DatabaseCleaner.Callback {
boolean stored = db.addGroupMessage(txn, m);
// Mark the message as seen by the sender
MessageId id = m.getId();
if(sender != null) db.setStatus(txn, sender, id, SEEN);
if(sender != null) db.addStatus(txn, sender, id, true);
if(stored) {
// Mark the message as unseen by other contacts
for(ContactId c : db.getContacts(txn)) {
if(!c.equals(sender)) db.setStatus(txn, c, id, NEW);
if(!c.equals(sender)) db.addStatus(txn, c, id, false);
}
// Calculate and store the message's sendability
int sendability = calculateSendability(txn, m);
@@ -441,8 +439,8 @@ DatabaseCleaner.Callback {
if(m.getAuthor() != null) throw new IllegalArgumentException();
if(!db.addPrivateMessage(txn, m, c)) return false;
MessageId id = m.getId();
if(incoming) db.setStatus(txn, c, id, SEEN);
else db.setStatus(txn, c, id, NEW);
if(incoming) db.addStatus(txn, c, id, true);
else db.addStatus(txn, c, id, false);
// Count the bytes stored
synchronized(spaceLock) {
bytesStoredSinceLastCheck += m.getSerialised().length;
@@ -526,7 +524,8 @@ DatabaseCleaner.Callback {
try {
T txn = db.startTransaction();
try {
db.addOutstandingMessages(txn, c, ids);
// FIXME: Calculate the expiry time
db.addOutstandingMessages(txn, c, ids, Long.MAX_VALUE);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -585,7 +584,8 @@ DatabaseCleaner.Callback {
try {
T txn = db.startTransaction();
try {
db.addOutstandingMessages(txn, c, ids);
// FIXME: Calculate the expiry time
db.addOutstandingMessages(txn, c, ids, Long.MAX_VALUE);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);

View File

@@ -4,6 +4,7 @@ import java.sql.Connection;
import java.util.concurrent.Executor;
import net.sf.briar.api.clock.Clock;
import net.sf.briar.api.clock.SystemClock;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseConfig;
import net.sf.briar.api.db.DatabaseExecutor;
@@ -41,7 +42,7 @@ public class DatabaseModule extends AbstractModule {
@Provides
Database<Connection> getDatabase(DatabaseConfig config) {
return new H2Database(config);
return new H2Database(config, new SystemClock());
}
@Provides @Singleton

View File

@@ -8,6 +8,7 @@ import java.sql.SQLException;
import java.util.Arrays;
import java.util.Properties;
import net.sf.briar.api.clock.Clock;
import net.sf.briar.api.crypto.Password;
import net.sf.briar.api.db.DatabaseConfig;
import net.sf.briar.api.db.DbException;
@@ -29,8 +30,8 @@ class H2Database extends JdbcDatabase {
private final long maxSize;
@Inject
H2Database(DatabaseConfig config) {
super(HASH_TYPE, BINARY_TYPE, COUNTER_TYPE, SECRET_TYPE);
H2Database(DatabaseConfig config, Clock clock) {
super(HASH_TYPE, BINARY_TYPE, COUNTER_TYPE, SECRET_TYPE, clock);
home = new File(config.getDataDirectory(), "db");
url = "jdbc:h2:split:" + home.getPath()
+ ";CIPHER=AES;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=false";

View File

@@ -5,9 +5,6 @@ import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static net.sf.briar.api.Rating.UNRATED;
import static net.sf.briar.db.DatabaseConstants.RETENTION_MODULUS;
import static net.sf.briar.db.Status.NEW;
import static net.sf.briar.db.Status.SEEN;
import static net.sf.briar.db.Status.SENT;
import java.io.File;
import java.io.FileNotFoundException;
@@ -31,6 +28,7 @@ import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating;
import net.sf.briar.api.TransportConfig;
import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.clock.Clock;
import net.sf.briar.api.db.DbClosedException;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageHeader;
@@ -157,7 +155,9 @@ abstract class JdbcDatabase implements Database<Connection> {
"CREATE TABLE statuses"
+ " (messageId HASH NOT NULL,"
+ " contactId INT NOT NULL,"
+ " status SMALLINT NOT NULL,"
+ " seen BOOLEAN NOT NULL,"
+ " transmissionCount INT NOT NULL,"
+ " expiry BIGINT NOT NULL,"
+ " PRIMARY KEY (messageId, contactId),"
+ " FOREIGN KEY (messageId)"
+ " REFERENCES messages (messageId)"
@@ -298,6 +298,7 @@ abstract class JdbcDatabase implements Database<Connection> {
// Different database libraries use different names for certain types
private final String hashType, binaryType, counterType, secretType;
private final Clock clock;
private final LinkedList<Connection> connections =
new LinkedList<Connection>(); // Locking: self
@@ -308,11 +309,12 @@ abstract class JdbcDatabase implements Database<Connection> {
protected abstract Connection createConnection() throws SQLException;
JdbcDatabase(String hashType, String binaryType, String counterType,
String secretType) {
String secretType, Clock clock) {
this.hashType = hashType;
this.binaryType = binaryType;
this.counterType = counterType;
this.secretType = secretType;
this.clock = clock;
}
protected void open(boolean resume, File dir, String driverClass)
@@ -646,18 +648,19 @@ abstract class JdbcDatabase implements Database<Connection> {
}
public void addOutstandingMessages(Connection txn, ContactId c,
Collection<MessageId> sent) throws DbException {
Collection<MessageId> sent, long expiry) throws DbException {
PreparedStatement ps = null;
try {
// Set the status of each message to SENT if it's currently NEW
String sql = "UPDATE statuses SET status = ?"
+ " WHERE messageId = ? AND contactId = ? AND status = ?";
// Update the transmission count and expiry time of each message
String sql = "UPDATE statuses SET expiry = ?,"
+ " transmissionCount = transmissionCount + ?"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setShort(1, (short) SENT.ordinal());
ps.setInt(3, c.getInt());
ps.setShort(4, (short) NEW.ordinal());
ps.setLong(1, expiry);
ps.setInt(2, 1);
ps.setInt(4, c.getInt());
for(MessageId m : sent) {
ps.setBytes(2, m.getBytes());
ps.setBytes(3, m.getBytes());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
@@ -705,6 +708,26 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void addStatus(Connection txn, ContactId c, MessageId m,
boolean seen) throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO statuses"
+ " (messageId, contactId, seen, transmissionCount, expiry)"
+ " VALUES (?, ?, ?, ZERO(), ZERO())";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
ps.setBoolean(3, seen);
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void addSubscription(Connection txn, Group g) throws DbException {
PreparedStatement ps = null;
try {
@@ -1220,6 +1243,7 @@ abstract class JdbcDatabase implements Database<Connection> {
public Collection<MessageId> getMessagesToOffer(Connection txn,
ContactId c, int maxMessages) throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -1227,11 +1251,11 @@ abstract class JdbcDatabase implements Database<Connection> {
String sql = "SELECT m.messageId FROM messages AS m"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " WHERE m.contactId = ? AND status = ?"
+ " WHERE m.contactId = ? AND seen = FALSE AND expiry < ?"
+ " ORDER BY timestamp DESC LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) NEW.ordinal());
ps.setLong(2, now);
ps.setInt(3, maxMessages);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
@@ -1254,12 +1278,12 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?"
+ " AND timestamp >= retention"
+ " AND status = ?"
+ " AND seen = FALSE AND expiry < ?"
+ " AND sendability > ZERO()"
+ " ORDER BY timestamp DESC LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) NEW.ordinal());
ps.setLong(2, now);
ps.setInt(3, maxMessages - ids.size());
rs = ps.executeQuery();
while(rs.next()) ids.add(new MessageId(rs.getBytes(2)));
@@ -1383,6 +1407,7 @@ abstract class JdbcDatabase implements Database<Connection> {
public byte[] getRawMessageIfSendable(Connection txn, ContactId c,
MessageId m) throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -1391,11 +1416,11 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " WHERE m.messageId = ? AND m.contactId = ?"
+ " AND status = ?";
+ " AND seen = FALSE AND expiry < ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
ps.setShort(3, (short) NEW.ordinal());
ps.setLong(3, now);
rs = ps.executeQuery();
byte[] raw = null;
if(rs.next()) {
@@ -1422,12 +1447,12 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " WHERE m.messageId = ?"
+ " AND cg.contactId = ?"
+ " AND timestamp >= retention"
+ " AND status = ?"
+ " AND seen = FALSE AND expiry < ?"
+ " AND sendability > ZERO()";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
ps.setShort(3, (short) NEW.ordinal());
ps.setLong(3, now);
rs = ps.executeQuery();
if(rs.next()) {
int length = rs.getInt(1);
@@ -1631,6 +1656,7 @@ abstract class JdbcDatabase implements Database<Connection> {
public Collection<MessageId> getSendableMessages(Connection txn,
ContactId c, int maxLength) throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -1638,11 +1664,11 @@ abstract class JdbcDatabase implements Database<Connection> {
String sql = "SELECT length, m.messageId FROM messages AS m"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " WHERE m.contactId = ? AND status = ?"
+ " WHERE m.contactId = ? AND seen = FALSE AND expiry < ?"
+ " ORDER BY timestamp DESC";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) NEW.ordinal());
ps.setLong(2, now);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
int total = 0;
@@ -1669,12 +1695,12 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?"
+ " AND timestamp >= retention"
+ " AND status = ?"
+ " AND seen = FALSE AND expiry < ?"
+ " AND sendability > ZERO()"
+ " ORDER BY timestamp DESC";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) NEW.ordinal());
ps.setLong(2, now);
rs = ps.executeQuery();
while(rs.next()) {
int length = rs.getInt(1);
@@ -1988,6 +2014,7 @@ abstract class JdbcDatabase implements Database<Connection> {
public boolean hasSendableMessages(Connection txn, ContactId c)
throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -1995,11 +2022,11 @@ abstract class JdbcDatabase implements Database<Connection> {
String sql = "SELECT m.messageId FROM messages AS m"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " WHERE m.contactId = ? AND status = ?"
+ " WHERE m.contactId = ? AND seen = FALSE AND expiry < ?"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) NEW.ordinal());
ps.setLong(2, now);
ps.setInt(3, 1);
rs = ps.executeQuery();
boolean found = rs.next();
@@ -2021,12 +2048,12 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?"
+ " AND timestamp >= retention"
+ " AND status = ?"
+ " AND seen = FALSE AND expiry < ?"
+ " AND sendability > ZERO()"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setShort(2, (short) NEW.ordinal());
ps.setLong(2, now);
ps.setInt(3, 1);
rs = ps.executeQuery();
found = rs.next();
@@ -2100,13 +2127,11 @@ abstract class JdbcDatabase implements Database<Connection> {
Collection<MessageId> acked) throws DbException {
PreparedStatement ps = null;
try {
// Set the status of each message to SEEN if it's currently SENT
String sql = "UPDATE statuses SET status = ?"
+ " WHERE messageId = ? AND contactId = ? AND status = ?";
// Set the status of each message to seen = true
String sql = "UPDATE statuses SET seen = TRUE"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setShort(1, (short) SEEN.ordinal());
ps.setInt(3, c.getInt());
ps.setShort(4, (short) SENT.ordinal());
ps.setInt(1, c.getInt());
for(MessageId m : acked) {
ps.setBytes(2, m.getBytes());
ps.addBatch();
@@ -2612,55 +2637,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setStatus(Connection txn, ContactId c, MessageId m, Status s)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT status FROM statuses"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
rs = ps.executeQuery();
if(rs.next()) {
// A status row exists - update it if necessary
Status old = Status.values()[rs.getByte(1)];
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
if(old != SEEN && old != s) {
sql = "UPDATE statuses SET status = ?"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setShort(1, (short) s.ordinal());
ps.setBytes(2, m.getBytes());
ps.setInt(3, c.getInt());
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
}
} else {
// No status row exists - create one
rs.close();
ps.close();
sql = "INSERT INTO statuses (messageId, contactId, status)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
ps.setShort(3, (short) s.ordinal());
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
}
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public boolean setStatusSeenIfVisible(Connection txn, ContactId c,
MessageId m) throws DbException {
PreparedStatement ps = null;
@@ -2686,10 +2662,10 @@ abstract class JdbcDatabase implements Database<Connection> {
rs.close();
ps.close();
if(!found) return false;
sql = "UPDATE statuses SET status = ?"
sql = "UPDATE statuses SET seen = ?"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setShort(1, (short) SEEN.ordinal());
ps.setBoolean(1, true);
ps.setBytes(2, m.getBytes());
ps.setInt(3, c.getInt());
int affected = ps.executeUpdate();

View File

@@ -1,11 +0,0 @@
package net.sf.briar.db;
/** The status of a message with respect to a particular contact. */
enum Status {
/** The message has not been sent, received, or acked. */
NEW,
/** The message has been sent, but not received or acked. */
SENT,
/** The message has been received or acked. */
SEEN
}