Removed message expiry code. #180

This commit is contained in:
akwizgran
2015-12-16 11:56:19 +00:00
parent 01ecfb435a
commit 6e61504d24
29 changed files with 104 additions and 1330 deletions

View File

@@ -15,8 +15,6 @@ import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageHeader;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
@@ -379,14 +377,6 @@ interface Database<T> {
Collection<MessageId> getMessagesToRequest(T txn, ContactId c,
int maxMessages) throws DbException;
/**
* Returns the IDs of the oldest messages in the database, with a total
* size less than or equal to the given size.
* <p>
* Locking: read.
*/
Collection<MessageId> getOldMessages(T txn, int size) throws DbException;
/**
* Returns the parent of the given message, or null if either the message
* has no parent, or the parent is absent from the database, or the parent
@@ -428,22 +418,6 @@ interface Database<T> {
Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c,
int maxLength) throws DbException;
/**
* Returns a retention ack for the given contact, or null if no ack is due.
* <p>
* Locking: write.
*/
RetentionAck getRetentionAck(T txn, ContactId c) throws DbException;
/**
* Returns a retention update for the given contact and updates its expiry
* time using the given latency, or returns null if no update is due.
* <p>
* Locking: write.
*/
RetentionUpdate getRetentionUpdate(T txn, ContactId c, int maxLatency)
throws DbException;
/**
* Returns all settings.
* <p>
@@ -532,14 +506,6 @@ interface Database<T> {
void incrementStreamCounter(T txn, ContactId c, TransportId t,
long rotationPeriod) throws DbException;
/**
* Increments the retention time versions for all contacts to indicate that
* the database's retention time has changed and updates should be sent.
* <p>
* Locking: write.
*/
void incrementRetentionVersions(T txn) throws DbException;
/**
* Marks the given messages as not needing to be acknowledged to the
* given contact.
@@ -728,25 +694,6 @@ interface Database<T> {
boolean setRemoteProperties(T txn, ContactId c, TransportId t,
TransportProperties p, long version) throws DbException;
/**
* Sets the retention time of the given contact's database and returns
* true, unless an update with an equal or higher version number has
* already been received from the contact.
* <p>
* Locking: write.
*/
boolean setRetentionTime(T txn, ContactId c, long retention, long version)
throws DbException;
/**
* Records a retention ack from the given contact for the given version,
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: write.
*/
void setRetentionUpdateAcked(T txn, ContactId c, long version)
throws DbException;
/**
* Records a subscription ack from the given contact for the given version,
* unless the contact has already acked an equal or higher version.

View File

@@ -1,34 +0,0 @@
package org.briarproject.db;
import org.briarproject.api.db.DbException;
interface DatabaseCleaner {
/**
* Starts a new thread to monitor the amount of free storage space
* available to the database and expire old messages as necessary. The
* cleaner will pause for the given number of milliseconds between sweeps.
*/
void startCleaning(Callback callback, long msBetweenSweeps);
/** Tells the cleaner thread to exit. */
void stopCleaning();
interface Callback {
/**
* Checks how much free storage space is available to the database, and
* if necessary expires old messages until the free space is at least
* DatabaseConstants.MIN_FREE_SPACE. If the free space is less than
* DatabaseConstants.CRITICAL_FREE_SPACE and there are no more messages
* to expire, an Error will be thrown.
*/
void checkFreeSpaceAndClean() throws DbException;
/**
* Returns true if the amount of free storage space available to the
* database should be checked.
*/
boolean shouldCheckFreeSpace();
}
}

View File

@@ -1,54 +0,0 @@
package org.briarproject.db;
import static java.util.logging.Level.WARNING;
import java.util.TimerTask;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.db.DbClosedException;
import org.briarproject.api.db.DbException;
import org.briarproject.api.system.Timer;
class DatabaseCleanerImpl extends TimerTask implements DatabaseCleaner {
private static final Logger LOG =
Logger.getLogger(DatabaseCleanerImpl.class.getName());
private final Timer timer;
private volatile Callback callback = null;
@Inject
DatabaseCleanerImpl(Timer timer) {
this.timer = timer;
}
public void startCleaning(Callback callback, long msBetweenSweeps) {
this.callback = callback;
timer.scheduleAtFixedRate(this, 0, msBetweenSweeps);
}
public void stopCleaning() {
timer.cancel();
}
public void run() {
if (callback == null) throw new IllegalStateException();
try {
if (callback.shouldCheckFreeSpace()) {
LOG.info("Checking free space");
callback.checkFreeSpaceAndClean();
}
} catch (DbClosedException e) {
LOG.info("Database closed, exiting");
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
throw new Error(e); // Kill the application
} catch (RuntimeException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
throw new Error(e); // Kill the application
}
}
}

View File

@@ -26,13 +26,11 @@ import org.briarproject.api.event.LocalAuthorRemovedEvent;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageExpiredEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.MessagesAckedEvent;
import org.briarproject.api.event.MessagesSentEvent;
import org.briarproject.api.event.RemoteRetentionTimeUpdatedEvent;
import org.briarproject.api.event.RemoteSubscriptionsUpdatedEvent;
import org.briarproject.api.event.RemoteTransportsUpdatedEvent;
import org.briarproject.api.event.SettingsUpdatedEvent;
@@ -49,8 +47,6 @@ import org.briarproject.api.sync.MessageHeader;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
@@ -71,13 +67,8 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.db.DatabaseConstants.BYTES_PER_SWEEP;
import static org.briarproject.db.DatabaseConstants.CRITICAL_FREE_SPACE;
import static org.briarproject.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
import static org.briarproject.db.DatabaseConstants.MAX_TRANSACTIONS_BETWEEN_SPACE_CHECKS;
import static org.briarproject.db.DatabaseConstants.MIN_FREE_SPACE;
/**
* An implementation of DatabaseComponent using reentrant read-write locks.
@@ -85,15 +76,12 @@ import static org.briarproject.db.DatabaseConstants.MIN_FREE_SPACE;
* writers to starve. LockFairnessTest can be used to test whether this
* implementation is safe on a given JVM.
*/
class DatabaseComponentImpl<T> implements DatabaseComponent,
DatabaseCleaner.Callback {
class DatabaseComponentImpl<T> implements DatabaseComponent {
private static final Logger LOG =
Logger.getLogger(DatabaseComponentImpl.class.getName());
private static final int MS_BETWEEN_SWEEPS = 10 * 1000; // 10 seconds
private final Database<T> db;
private final DatabaseCleaner cleaner;
private final EventBus eventBus;
private final ShutdownManager shutdown;
@@ -104,10 +92,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent,
private int shutdownHandle = -1; // Locking: lock.writeLock
@Inject
DatabaseComponentImpl(Database<T> db, DatabaseCleaner cleaner,
EventBus eventBus, ShutdownManager shutdown) {
DatabaseComponentImpl(Database<T> db, EventBus eventBus,
ShutdownManager shutdown) {
this.db = db;
this.cleaner = cleaner;
this.eventBus = eventBus;
this.shutdown = shutdown;
}
@@ -135,7 +122,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent,
if (open) throw new IllegalStateException();
open = true;
boolean reopened = db.open();
cleaner.startCleaning(this, MS_BETWEEN_SWEEPS);
shutdownHandle = shutdown.addShutdownHook(shutdownHook);
return reopened;
} finally {
@@ -150,7 +136,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent,
open = false;
if (shutdownHandle != -1)
shutdown.removeShutdownHook(shutdownHandle);
cleaner.stopCleaning();
db.close();
} finally {
lock.writeLock().unlock();
@@ -439,45 +424,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent,
return Collections.unmodifiableList(messages);
}
public RetentionAck generateRetentionAck(ContactId c) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
RetentionAck a = db.getRetentionAck(txn, c);
db.commitTransaction(txn);
return a;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public RetentionUpdate generateRetentionUpdate(ContactId c, int maxLatency)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
RetentionUpdate u = db.getRetentionUpdate(txn, c, maxLatency);
db.commitTransaction(txn);
return u;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public SubscriptionAck generateSubscriptionAck(ContactId c)
throws DbException {
lock.writeLock().lock();
@@ -1168,47 +1114,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent,
if (requested) eventBus.broadcast(new MessageRequestedEvent(c));
}
public void receiveRetentionAck(ContactId c, RetentionAck a)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setRetentionUpdateAcked(txn, c, a.getVersion());
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void receiveRetentionUpdate(ContactId c, RetentionUpdate u)
throws DbException {
boolean updated;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
long retention = u.getRetentionTime(), version = u.getVersion();
updated = db.setRetentionTime(txn, c, retention, version);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (updated) eventBus.broadcast(new RemoteRetentionTimeUpdatedEvent(c));
}
public void receiveSubscriptionAck(ContactId c, SubscriptionAck a)
throws DbException {
lock.writeLock().lock();
@@ -1559,57 +1464,4 @@ class DatabaseComponentImpl<T> implements DatabaseComponent,
lock.writeLock().unlock();
}
}
public void checkFreeSpaceAndClean() throws DbException {
long freeSpace = db.getFreeSpace();
if (LOG.isLoggable(INFO)) LOG.info(freeSpace + " bytes free space");
while (freeSpace < MIN_FREE_SPACE) {
boolean expired = expireMessages(BYTES_PER_SWEEP);
if (freeSpace < CRITICAL_FREE_SPACE && !expired) {
// FIXME: Work out what to do here
throw new Error("Disk space is critically low");
}
Thread.yield();
freeSpace = db.getFreeSpace();
}
}
/**
* Removes the oldest messages from the database, with a total size less
* than or equal to the given size, and returns true if any messages were
* removed.
*/
private boolean expireMessages(int size) throws DbException {
Collection<MessageId> expired;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
expired = db.getOldMessages(txn, size);
if (!expired.isEmpty()) {
for (MessageId m : expired) db.removeMessage(txn, m);
db.incrementRetentionVersions(txn);
if (LOG.isLoggable(INFO))
LOG.info("Expired " + expired.size() + " messages");
}
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (expired.isEmpty()) return false;
eventBus.broadcast(new MessageExpiredEvent());
return true;
}
public boolean shouldCheckFreeSpace() {
if (db.getTransactionCount() > MAX_TRANSACTIONS_BETWEEN_SPACE_CHECKS) {
db.resetTransactionCount();
return true;
}
return false;
}
}

View File

@@ -40,20 +40,18 @@ public class DatabaseModule extends AbstractModule {
@Override
protected void configure() {
bind(DatabaseCleaner.class).to(DatabaseCleanerImpl.class);
// Nothing to bind
}
@Provides
@Provides @Singleton
Database<Connection> getDatabase(DatabaseConfig config) {
return new H2Database(config, new SystemClock());
}
@Provides @Singleton
DatabaseComponent getDatabaseComponent(Database<Connection> db,
DatabaseCleaner cleaner, EventBus eventBus,
ShutdownManager shutdown) {
return new DatabaseComponentImpl<Connection>(db, cleaner, eventBus,
shutdown);
EventBus eventBus, ShutdownManager shutdown) {
return new DatabaseComponentImpl<Connection>(db, eventBus, shutdown);
}
@Provides @Singleton @DatabaseExecutor

View File

@@ -18,8 +18,6 @@ import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageHeader;
import org.briarproject.api.sync.MessageHeader.State;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
@@ -58,7 +56,6 @@ import static org.briarproject.api.Author.Status.ANONYMOUS;
import static org.briarproject.api.Author.Status.UNKNOWN;
import static org.briarproject.api.Author.Status.VERIFIED;
import static org.briarproject.api.sync.MessagingConstants.MAX_SUBSCRIPTIONS;
import static org.briarproject.api.sync.MessagingConstants.RETENTION_GRANULARITY;
import static org.briarproject.db.ExponentialBackoff.calculateExpiry;
/**
@@ -200,21 +197,6 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String INDEX_STATUSES_BY_CONTACT =
"CREATE INDEX statusesByContact ON statuses (contactId)";
private static final String CREATE_RETENTION_VERSIONS =
"CREATE TABLE retentionVersions"
+ " (contactId INT NOT NULL,"
+ " retention BIGINT NOT NULL,"
+ " localVersion BIGINT NOT NULL,"
+ " localAcked BIGINT NOT NULL,"
+ " remoteVersion BIGINT NOT NULL,"
+ " remoteAcked BOOLEAN NOT NULL,"
+ " expiry BIGINT NOT NULL,"
+ " txCount INT NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_TRANSPORTS =
"CREATE TABLE transports"
+ " (transportId VARCHAR NOT NULL,"
@@ -416,7 +398,6 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(insertTypeNames(CREATE_STATUSES));
s.executeUpdate(INDEX_STATUSES_BY_MESSAGE);
s.executeUpdate(INDEX_STATUSES_BY_CONTACT);
s.executeUpdate(insertTypeNames(CREATE_RETENTION_VERSIONS));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORTS));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORT_CONFIGS));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORT_PROPS));
@@ -632,16 +613,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
ps.close();
}
// Create a retention version row
sql = "INSERT INTO retentionVersions (contactId, retention,"
+ " localVersion, localAcked, remoteVersion, remoteAcked,"
+ " expiry, txCount)"
+ " VALUES (?, 0, 1, 0, 0, TRUE, 0, 0)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
// Create a group version row
sql = "INSERT INTO groupVersions (contactId, localVersion,"
+ " localAcked, remoteVersion, remoteAcked, expiry,"
@@ -1712,13 +1683,10 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId"
+ " JOIN retentionVersions AS rv"
+ " ON cg.contactId = rv.contactId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?"
+ " AND timestamp >= retention"
+ " AND seen = FALSE AND requested = FALSE"
+ " AND s.expiry < ?"
+ " ORDER BY timestamp DESC LIMIT ?";
@@ -1775,13 +1743,10 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId"
+ " JOIN retentionVersions AS rv"
+ " ON cg.contactId = rv.contactId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?"
+ " AND timestamp >= retention"
+ " AND seen = FALSE"
+ " AND s.expiry < ?"
+ " ORDER BY timestamp DESC";
@@ -1807,33 +1772,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<MessageId> getOldMessages(Connection txn, int capacity)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT length, messageId FROM messages"
+ " ORDER BY timestamp";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
int total = 0;
while (rs.next()) {
int length = rs.getInt(1);
if (total + length > capacity) break;
ids.add(new MessageId(rs.getBytes(2)));
total += length;
}
rs.close();
ps.close();
return Collections.unmodifiableList(ids);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public MessageId getParent(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -1954,13 +1892,10 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId"
+ " JOIN retentionVersions AS rv"
+ " ON cg.contactId = rv.contactId"
+ " JOIN statuses AS s"
+ " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?"
+ " AND timestamp >= retention"
+ " AND seen = FALSE AND requested = TRUE"
+ " AND s.expiry < ?"
+ " ORDER BY timestamp DESC";
@@ -1986,94 +1921,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public RetentionAck getRetentionAck(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT remoteVersion FROM retentionVersions"
+ " WHERE contactId = ? AND remoteAcked = FALSE";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if (!rs.next()) {
rs.close();
ps.close();
return null;
}
long version = rs.getLong(1);
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
sql = "UPDATE retentionVersions SET remoteAcked = TRUE"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
return new RetentionAck(version);
} catch (SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public RetentionUpdate getRetentionUpdate(Connection txn, ContactId c,
int maxLatency) throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT localVersion, txCount"
+ " FROM retentionVersions"
+ " WHERE contactId = ?"
+ " AND localVersion > localAcked"
+ " AND expiry < ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setLong(2, now);
rs = ps.executeQuery();
if (!rs.next()) {
rs.close();
ps.close();
return null;
}
long version = rs.getLong(1);
int txCount = rs.getInt(2);
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
sql = "SELECT timestamp FROM messages AS m"
+ " ORDER BY timestamp LIMIT 1";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
long retention = 0;
if (rs.next()) {
retention = rs.getLong(1);
retention -= retention % RETENTION_GRANULARITY;
}
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
sql = "UPDATE retentionVersions"
+ " SET expiry = ?, txCount = txCount + 1"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, calculateExpiry(now, maxLatency, txCount));
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
return new RetentionUpdate(retention, version);
} catch (SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public Settings getSettings(Connection txn) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -2474,19 +2321,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void incrementRetentionVersions(Connection txn) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE retentionVersions"
+ " SET localVersion = localVersion + 1, expiry = 0";
ps = txn.prepareStatement(sql);
ps.executeUpdate();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void lowerAckFlag(Connection txn, ContactId c,
Collection<MessageId> acked) throws DbException {
PreparedStatement ps = null;
@@ -3195,49 +3029,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public boolean setRetentionTime(Connection txn, ContactId c, long retention,
long version) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE retentionVersions SET retention = ?,"
+ " remoteVersion = ?, remoteAcked = FALSE"
+ " WHERE contactId = ? AND remoteVersion < ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, retention);
ps.setLong(2, version);
ps.setInt(3, c.getInt());
ps.setLong(4, version);
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
return affected == 1;
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setRetentionUpdateAcked(Connection txn, ContactId c,
long version) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE retentionVersions SET localAcked = ?"
+ " WHERE contactId = ?"
+ " AND localAcked < ? AND localVersion >= ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, version);
ps.setInt(2, c.getInt());
ps.setLong(3, version);
ps.setLong(4, version);
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setSubscriptionUpdateAcked(Connection txn, ContactId c,
long version) throws DbException {
PreparedStatement ps = null;

View File

@@ -11,11 +11,9 @@ import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageExpiredEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.RemoteRetentionTimeUpdatedEvent;
import org.briarproject.api.event.RemoteSubscriptionsUpdatedEvent;
import org.briarproject.api.event.RemoteTransportsUpdatedEvent;
import org.briarproject.api.event.ShutdownEvent;
@@ -25,8 +23,6 @@ import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
@@ -46,7 +42,7 @@ import static java.util.logging.Level.WARNING;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
/**
* An outgoing {@link MessagingSession
* An outgoing {@link org.briarproject.api.sync.MessagingSession
* MessagingSession} suitable for duplex transports. The session offers
* messages before sending them, keeps its output stream open when there are no
* packets to send, and reacts to events that make packets available to send.
@@ -73,10 +69,6 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
private final PacketWriter packetWriter;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
// The following must only be accessed on the writer thread
private long nextKeepalive = 0, nextRetxQuery = 0;
private boolean dataToFlush = true;
private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
@@ -103,15 +95,14 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionAck());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
dbExecutor.execute(new GenerateRequest());
long now = clock.currentTimeMillis();
nextKeepalive = now + maxIdleTime;
nextRetxQuery = now + RETX_QUERY_INTERVAL;
long nextKeepalive = now + maxIdleTime;
long nextRetxQuery = now + RETX_QUERY_INTERVAL;
boolean dataToFlush = true;
// Write packets until interrupted
try {
while (!interrupted) {
@@ -134,7 +125,6 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
// Check for retransmittable packets
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
nextRetxQuery = now + RETX_QUERY_INTERVAL;
@@ -173,8 +163,6 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof MessageAddedEvent) {
dbExecutor.execute(new GenerateOffer());
} else if (e instanceof MessageExpiredEvent) {
dbExecutor.execute(new GenerateRetentionUpdate());
} else if (e instanceof LocalSubscriptionsUpdatedEvent) {
LocalSubscriptionsUpdatedEvent l =
(LocalSubscriptionsUpdatedEvent) e;
@@ -193,11 +181,6 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
} else if (e instanceof MessageToRequestEvent) {
if (((MessageToRequestEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateRequest());
} else if (e instanceof RemoteRetentionTimeUpdatedEvent) {
RemoteRetentionTimeUpdatedEvent r =
(RemoteRetentionTimeUpdatedEvent) e;
if (r.getContactId().equals(contactId))
dbExecutor.execute(new GenerateRetentionAck());
} else if (e instanceof RemoteSubscriptionsUpdatedEvent) {
RemoteSubscriptionsUpdatedEvent r =
(RemoteSubscriptionsUpdatedEvent) e;
@@ -360,77 +343,6 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
}
}
// This task runs on the database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
if (interrupted) return;
try {
RetentionAck a = db.generateRetentionAck(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated retention ack: " + (a != null));
if (a != null) writerTasks.add(new WriteRetentionAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteRetentionAck implements ThrowingRunnable<IOException> {
private final RetentionAck ack;
private WriteRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeRetentionAck(ack);
LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck());
}
}
// This task runs on the database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
if (interrupted) return;
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated retention update: " + (u != null));
if (u != null) writerTasks.add(new WriteRetentionUpdate(u));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteRetentionUpdate
implements ThrowingRunnable<IOException> {
private final RetentionUpdate update;
private WriteRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeRetentionUpdate(update);
LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate());
}
}
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {

View File

@@ -18,8 +18,6 @@ import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
@@ -34,7 +32,7 @@ import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
/**
* An incoming {@link MessagingSession
* An incoming {@link org.briarproject.api.sync.MessagingSession
* MessagingSession}.
*/
class IncomingSession implements MessagingSession, EventListener {
@@ -83,12 +81,6 @@ class IncomingSession implements MessagingSession, EventListener {
} else if (packetReader.hasRequest()) {
Request r = packetReader.readRequest();
dbExecutor.execute(new ReceiveRequest(r));
} else if (packetReader.hasRetentionAck()) {
RetentionAck a = packetReader.readRetentionAck();
dbExecutor.execute(new ReceiveRetentionAck(a));
} else if (packetReader.hasRetentionUpdate()) {
RetentionUpdate u = packetReader.readRetentionUpdate();
dbExecutor.execute(new ReceiveRetentionUpdate(u));
} else if (packetReader.hasSubscriptionAck()) {
SubscriptionAck a = packetReader.readSubscriptionAck();
dbExecutor.execute(new ReceiveSubscriptionAck(a));
@@ -218,42 +210,6 @@ class IncomingSession implements MessagingSession, EventListener {
}
}
private class ReceiveRetentionAck implements Runnable {
private final RetentionAck ack;
private ReceiveRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveRetentionAck(contactId, ack);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private ReceiveRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveRetentionUpdate(contactId, update);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;

View File

@@ -12,8 +12,6 @@ import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
@@ -40,8 +38,6 @@ import static org.briarproject.api.sync.PacketTypes.ACK;
import static org.briarproject.api.sync.PacketTypes.MESSAGE;
import static org.briarproject.api.sync.PacketTypes.OFFER;
import static org.briarproject.api.sync.PacketTypes.REQUEST;
import static org.briarproject.api.sync.PacketTypes.RETENTION_ACK;
import static org.briarproject.api.sync.PacketTypes.RETENTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_ACK;
@@ -213,52 +209,6 @@ class PacketReaderImpl implements PacketReader {
return new Request(Collections.unmodifiableList(requested));
}
public boolean hasRetentionAck() throws IOException {
return !eof() && header[1] == RETENTION_ACK;
}
public RetentionAck readRetentionAck() throws IOException {
if (!hasRetentionAck()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the version
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the retention ack
return new RetentionAck(version);
}
public boolean hasRetentionUpdate() throws IOException {
return !eof() && header[1] == RETENTION_UPDATE;
}
public RetentionUpdate readRetentionUpdate() throws IOException {
if (!hasRetentionUpdate()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
Reader r = readerFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the retention time and version
long retention = r.readInteger();
if (retention < 0) throw new FormatException();
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the retention update
return new RetentionUpdate(retention, version);
}
public boolean hasSubscriptionAck() throws IOException {
return !eof() && header[1] == SUBSCRIPTION_ACK;
}

View File

@@ -9,8 +9,6 @@ import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.PacketTypes;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
@@ -30,8 +28,6 @@ import static org.briarproject.api.sync.MessagingConstants.PROTOCOL_VERSION;
import static org.briarproject.api.sync.PacketTypes.ACK;
import static org.briarproject.api.sync.PacketTypes.OFFER;
import static org.briarproject.api.sync.PacketTypes.REQUEST;
import static org.briarproject.api.sync.PacketTypes.RETENTION_ACK;
import static org.briarproject.api.sync.PacketTypes.RETENTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_ACK;
@@ -120,25 +116,6 @@ class PacketWriterImpl implements PacketWriter {
writePacket(REQUEST);
}
public void writeRetentionAck(RetentionAck a) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeInteger(a.getVersion());
w.writeListEnd();
writePacket(RETENTION_ACK);
}
public void writeRetentionUpdate(RetentionUpdate u) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);
w.writeListStart();
w.writeInteger(u.getRetentionTime());
w.writeInteger(u.getVersion());
w.writeListEnd();
writePacket(RETENTION_UPDATE);
}
public void writeSubscriptionAck(SubscriptionAck a) throws IOException {
assert payload.size() == 0;
Writer w = writerFactory.createWriter(payload);

View File

@@ -13,8 +13,6 @@ import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.MessagingSession;
import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.RetentionAck;
import org.briarproject.api.sync.RetentionUpdate;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
@@ -33,7 +31,7 @@ import static java.util.logging.Level.WARNING;
import static org.briarproject.api.sync.MessagingConstants.MAX_PAYLOAD_LENGTH;
/**
* An outgoing {@link MessagingSession
* An outgoing {@link org.briarproject.api.sync.MessagingSession
* MessagingSession} suitable for simplex transports. The session sends
* messages without offering them, and closes its output stream when there are
* no more packets to send.
@@ -70,7 +68,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
this.transportId = transportId;
this.maxLatency = maxLatency;
this.packetWriter = packetWriter;
outstandingQueries = new AtomicInteger(8); // One per type of packet
outstandingQueries = new AtomicInteger(6); // One per type of packet
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
}
@@ -82,8 +80,6 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionAck());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
// Write packets until interrupted or no more packets to write
@@ -196,79 +192,6 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
}
}
// This task runs on the database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
if (interrupted) return;
try {
RetentionAck a = db.generateRetentionAck(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated retention ack: " + (a != null));
if (a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteRetentionAck(a));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteRetentionAck implements ThrowingRunnable<IOException> {
private final RetentionAck ack;
private WriteRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeRetentionAck(ack);
LOG.info("Sent retention ack");
dbExecutor.execute(new GenerateRetentionAck());
}
}
// This task runs on the database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
if (interrupted) return;
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated retention update: " + (u != null));
if (u == null) decrementOutstandingQueries();
else writerTasks.add(new WriteRetentionUpdate(u));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteRetentionUpdate
implements ThrowingRunnable<IOException> {
private final RetentionUpdate update;
private WriteRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() throws IOException {
if (interrupted) return;
packetWriter.writeRetentionUpdate(update);
LOG.info("Sent retention update");
dbExecutor.execute(new GenerateRetentionUpdate());
}
}
// This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {