Use 'retention' rather than 'expiry' to describe DB's retention period.

This will avoid a name clash when retransmission is implemented.
This commit is contained in:
akwizgran
2013-01-29 14:33:43 +00:00
parent 3e1c41c62f
commit c3c349970b
13 changed files with 360 additions and 357 deletions

View File

@@ -11,8 +11,8 @@ import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.db.event.DatabaseListener; import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.ExpiryAck; import net.sf.briar.api.protocol.RetentionAck;
import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.RetentionUpdate;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -98,24 +98,24 @@ public interface DatabaseComponent {
Collection<byte[]> generateBatch(ContactId c, int maxLength, Collection<byte[]> generateBatch(ContactId c, int maxLength,
Collection<MessageId> requested) throws DbException; Collection<MessageId> requested) throws DbException;
/**
* Generates an expiry ack for the given contact. Returns null if no ack
* is due.
*/
ExpiryAck generateExpiryAck(ContactId c) throws DbException;
/**
* Generates an expiry update for the given contact. Returns null if no
* update is due.
*/
ExpiryUpdate generateExpiryUpdate(ContactId c) throws DbException;
/** /**
* Generates an offer for the given contact. Returns null if there are no * Generates an offer for the given contact. Returns null if there are no
* messages to offer. * messages to offer.
*/ */
Offer generateOffer(ContactId c, int maxMessages) throws DbException; Offer generateOffer(ContactId c, int maxMessages) throws DbException;
/**
* Generates a retention ack for the given contact. Returns null if no ack
* is due.
*/
RetentionAck generateRetentionAck(ContactId c) throws DbException;
/**
* Generates a retention update for the given contact. Returns null if no
* update is due.
*/
RetentionUpdate generateRetentionUpdate(ContactId c) throws DbException;
/** /**
* Generates a subscription ack for the given contact. Returns null if no * Generates a subscription ack for the given contact. Returns null if no
* ack is due. * ack is due.
@@ -200,12 +200,6 @@ public interface DatabaseComponent {
/** Processes an ack from the given contact. */ /** Processes an ack from the given contact. */
void receiveAck(ContactId c, Ack a) throws DbException; void receiveAck(ContactId c, Ack a) throws DbException;
/** Processes an expiry ack from the given contact. */
void receiveExpiryAck(ContactId c, ExpiryAck a) throws DbException;
/** Processes an expiry update from the given contact. */
void receiveExpiryUpdate(ContactId c, ExpiryUpdate u) throws DbException;
/** Processes a message from the given contact. */ /** Processes a message from the given contact. */
void receiveMessage(ContactId c, Message m) throws DbException; void receiveMessage(ContactId c, Message m) throws DbException;
@@ -219,6 +213,13 @@ public interface DatabaseComponent {
*/ */
Request receiveOffer(ContactId c, Offer o) throws DbException; Request receiveOffer(ContactId c, Offer o) throws DbException;
/** Processes a retention ack from the given contact. */
void receiveRetentionAck(ContactId c, RetentionAck a) throws DbException;
/** Processes a retention update from the given contact. */
void receiveRetentionUpdate(ContactId c, RetentionUpdate u)
throws DbException;
/** Processes a subscription ack from the given contact. */ /** Processes a subscription ack from the given contact. */
void receiveSubscriptionAck(ContactId c, SubscriptionAck a) void receiveSubscriptionAck(ContactId c, SubscriptionAck a)
throws DbException; throws DbException;

View File

@@ -1,23 +0,0 @@
package net.sf.briar.api.protocol;
/**
* A packet updating the recipient's view of the expiry time of the sender's
* database.
*/
public class ExpiryUpdate {
private final long expiry, version;
public ExpiryUpdate(long expiry, long version) {
this.expiry = expiry;
this.version = version;
}
public long getExpiryTime() {
return expiry;
}
public long getVersionNumber() {
return version;
}
}

View File

@@ -9,12 +9,6 @@ public interface ProtocolReader {
boolean hasAck() throws IOException; boolean hasAck() throws IOException;
Ack readAck() throws IOException; Ack readAck() throws IOException;
boolean hasExpiryAck() throws IOException;
ExpiryAck readExpiryAck() throws IOException;
boolean hasExpiryUpdate() throws IOException;
ExpiryUpdate readExpiryUpdate() throws IOException;
boolean hasMessage() throws IOException; boolean hasMessage() throws IOException;
UnverifiedMessage readMessage() throws IOException; UnverifiedMessage readMessage() throws IOException;
@@ -24,6 +18,12 @@ public interface ProtocolReader {
boolean hasRequest() throws IOException; boolean hasRequest() throws IOException;
Request readRequest() throws IOException; Request readRequest() throws IOException;
boolean hasRetentionAck() throws IOException;
RetentionAck readRetentionAck() throws IOException;
boolean hasRetentionUpdate() throws IOException;
RetentionUpdate readRetentionUpdate() throws IOException;
boolean hasSubscriptionAck() throws IOException; boolean hasSubscriptionAck() throws IOException;
SubscriptionAck readSubscriptionAck() throws IOException; SubscriptionAck readSubscriptionAck() throws IOException;

View File

@@ -10,16 +10,16 @@ public interface ProtocolWriter {
void writeAck(Ack a) throws IOException; void writeAck(Ack a) throws IOException;
void writeExpiryAck(ExpiryAck a) throws IOException;
void writeExpiryUpdate(ExpiryUpdate e) throws IOException;
void writeMessage(byte[] raw) throws IOException; void writeMessage(byte[] raw) throws IOException;
void writeOffer(Offer o) throws IOException; void writeOffer(Offer o) throws IOException;
void writeRequest(Request r) throws IOException; void writeRequest(Request r) throws IOException;
void writeRetentionAck(RetentionAck a) throws IOException;
void writeRetentionUpdate(RetentionUpdate u) throws IOException;
void writeSubscriptionAck(SubscriptionAck a) throws IOException; void writeSubscriptionAck(SubscriptionAck a) throws IOException;
void writeSubscriptionUpdate(SubscriptionUpdate u) throws IOException; void writeSubscriptionUpdate(SubscriptionUpdate u) throws IOException;

View File

@@ -1,11 +1,11 @@
package net.sf.briar.api.protocol; package net.sf.briar.api.protocol;
/** A packet acknowledging a (@link ExpiryUpdate} */ /** A packet acknowledging a (@link RetentionUpdate} */
public class ExpiryAck { public class RetentionAck {
private final long version; private final long version;
public ExpiryAck(long version) { public RetentionAck(long version) {
this.version = version; this.version = version;
} }

View File

@@ -0,0 +1,23 @@
package net.sf.briar.api.protocol;
/**
* A packet updating the recipient's view of the retention time of the sender's
* database.
*/
public class RetentionUpdate {
private final long retention, version;
public RetentionUpdate(long retention, long version) {
this.retention = retention;
this.version = version;
}
public long getRetentionTime() {
return retention;
}
public long getVersionNumber() {
return version;
}
}

View File

@@ -6,11 +6,11 @@ public interface Types {
int AUTHOR = 0; int AUTHOR = 0;
int GROUP = 1; int GROUP = 1;
int ACK = 2; int ACK = 2;
int EXPIRY_ACK = 3; int MESSAGE = 3;
int EXPIRY_UPDATE = 4; int OFFER = 4;
int MESSAGE = 5; int REQUEST = 5;
int OFFER = 6; int RETENTION_ACK = 6;
int REQUEST = 7; int RETENTION_UPDATE = 7;
int SUBSCRIPTION_ACK = 8; int SUBSCRIPTION_ACK = 8;
int SUBSCRIPTION_UPDATE = 9; int SUBSCRIPTION_UPDATE = 9;
int TRANSPORT_ACK = 10; int TRANSPORT_ACK = 10;

View File

@@ -11,8 +11,8 @@ import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageHeader; import net.sf.briar.api.db.MessageHeader;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.ExpiryAck; import net.sf.briar.api.protocol.RetentionAck;
import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.RetentionUpdate;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -36,9 +36,9 @@ import net.sf.briar.api.transport.TemporarySecret;
* deadlock, locks must be acquired in the following (alphabetical) order: * deadlock, locks must be acquired in the following (alphabetical) order:
* <ul> * <ul>
* <li> contact * <li> contact
* <li> expiry
* <li> message * <li> message
* <li> rating * <li> rating
* <li> retention
* <li> subscription * <li> subscription
* <li> transport * <li> transport
* <li> window * <li> window
@@ -208,21 +208,6 @@ interface Database<T> {
*/ */
Collection<ContactTransport> getContactTransports(T txn) throws DbException; Collection<ContactTransport> getContactTransports(T txn) throws DbException;
/**
* Returns an expiry ack for the given contact, or null if no ack is due.
* <p>
* Locking: contact read, expiry write.
*/
ExpiryAck getExpiryAck(T txn, ContactId c) throws DbException;
/**
* Returns an expiry update for the given contact, or null if no update is
* due.
* <p>
* Locking: contact read, expiry write.
*/
ExpiryUpdate getExpiryUpdate(T txn, ContactId c) throws DbException;
/** /**
* Returns the amount of free storage space available to the database, in * Returns the amount of free storage space available to the database, in
* bytes. This is based on the minimum of the space available on the device * bytes. This is based on the minimum of the space available on the device
@@ -346,6 +331,21 @@ interface Database<T> {
Map<ContactId, TransportProperties> getRemoteProperties(T txn, Map<ContactId, TransportProperties> getRemoteProperties(T txn,
TransportId t) throws DbException; TransportId t) throws DbException;
/**
* Returns a retention ack for the given contact, or null if no ack is due.
* <p>
* Locking: contact read, retention write.
*/
RetentionAck getRetentionAck(T txn, ContactId c) throws DbException;
/**
* Returns a retention update for the given contact, or null if no update
* is due.
* <p>
* Locking: contact read, retention write.
*/
RetentionUpdate getRetentionUpdate(T txn, ContactId c) throws DbException;
/** /**
* Returns all temporary secrets. * Returns all temporary secrets.
* <p> * <p>
@@ -459,12 +459,12 @@ interface Database<T> {
long period) throws DbException; long period) throws DbException;
/** /**
* Increments the expiry versions for all contacts to indicate that the * Increments the retention time versions for all contacts to indicate that
* database's expiry time has changed and expiry updates should be sent. * the database's retention time has changed and updates should be sent.
* <p> * <p>
* Locking: contact read, expiry write. * Locking: contact read, retention write.
*/ */
void incrementExpiryVersions(T txn) throws DbException; void incrementRetentionVersions(T txn) throws DbException;
/** /**
* Merges the given configuration with the existing configuration for the * Merges the given configuration with the existing configuration for the
@@ -549,16 +549,6 @@ interface Database<T> {
void setConnectionWindow(T txn, ContactId c, TransportId t, long period, void setConnectionWindow(T txn, ContactId c, TransportId t, long period,
long centre, byte[] bitmap) throws DbException; long centre, byte[] bitmap) throws DbException;
/**
* Sets the expiry time of the given contact's database, unless an update
* with an equal or higher version number has already been received from
* the contact.
* <p>
* Locking: contact read, expiry write.
*/
void setExpiryTime(T txn, ContactId c, long expiry, long version)
throws DbException;
/** /**
* Sets the user's rating for the given author. * Sets the user's rating for the given author.
* <p> * <p>
@@ -585,6 +575,16 @@ interface Database<T> {
void setRemoteProperties(T txn, ContactId c, TransportUpdate u) void setRemoteProperties(T txn, ContactId c, TransportUpdate u)
throws DbException; throws DbException;
/**
* Sets the retention time of the given contact's database, unless an
* update with an equal or higher version number has already been received
* from the contact.
* <p>
* Locking: contact read, retention write.
*/
void setRetentionTime(T txn, ContactId c, long retention, long version)
throws DbException;
/** /**
* Sets the sendability score of the given message. * Sets the sendability score of the given message.
* <p> * <p>
@@ -630,12 +630,12 @@ interface Database<T> {
throws DbException; throws DbException;
/** /**
* Records an expiry ack from the given contact for the given version * Records a retention ack from the given contact for the given version
* unless the contact has already acked an equal or higher version. * unless the contact has already acked an equal or higher version.
* <p> * <p>
* Locking: contact read, expiry write. * Locking: contact read, retention write.
*/ */
void setExpiryUpdateAcked(T txn, ContactId c, long version) void setRetentionUpdateAcked(T txn, ContactId c, long version)
throws DbException; throws DbException;
/** /**

View File

@@ -46,8 +46,8 @@ import net.sf.briar.api.db.event.TransportRemovedEvent;
import net.sf.briar.api.lifecycle.ShutdownManager; import net.sf.briar.api.lifecycle.ShutdownManager;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.ExpiryAck; import net.sf.briar.api.protocol.RetentionAck;
import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.RetentionUpdate;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -83,12 +83,12 @@ DatabaseCleaner.Callback {
private final ReentrantReadWriteLock contactLock = private final ReentrantReadWriteLock contactLock =
new ReentrantReadWriteLock(true); new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock expiryLock =
new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock messageLock = private final ReentrantReadWriteLock messageLock =
new ReentrantReadWriteLock(true); new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock ratingLock = private final ReentrantReadWriteLock ratingLock =
new ReentrantReadWriteLock(true); new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock retentionLock =
new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock subscriptionLock = private final ReentrantReadWriteLock subscriptionLock =
new ReentrantReadWriteLock(true); new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock transportLock = private final ReentrantReadWriteLock transportLock =
@@ -594,54 +594,6 @@ DatabaseCleaner.Callback {
return Collections.unmodifiableList(messages); return Collections.unmodifiableList(messages);
} }
public ExpiryAck generateExpiryAck(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
expiryLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
ExpiryAck a = db.getExpiryAck(txn, c);
db.commitTransaction(txn);
return a;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
expiryLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public ExpiryUpdate generateExpiryUpdate(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
expiryLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
ExpiryUpdate e = db.getExpiryUpdate(txn, c);
db.commitTransaction(txn);
return e;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
expiryLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public Offer generateOffer(ContactId c, int maxMessages) public Offer generateOffer(ContactId c, int maxMessages)
throws DbException { throws DbException {
Collection<MessageId> offered; Collection<MessageId> offered;
@@ -668,6 +620,55 @@ DatabaseCleaner.Callback {
return new Offer(offered); return new Offer(offered);
} }
public RetentionAck generateRetentionAck(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
retentionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
RetentionAck a = db.getRetentionAck(txn, c);
db.commitTransaction(txn);
return a;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
retentionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public RetentionUpdate generateRetentionUpdate(ContactId c)
throws DbException {
contactLock.readLock().lock();
try {
retentionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
RetentionUpdate u = db.getRetentionUpdate(txn, c);
db.commitTransaction(txn);
return u;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
retentionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public SubscriptionAck generateSubscriptionAck(ContactId c) public SubscriptionAck generateSubscriptionAck(ContactId c)
throws DbException { throws DbException {
contactLock.readLock().lock(); contactLock.readLock().lock();
@@ -1090,54 +1091,6 @@ DatabaseCleaner.Callback {
} }
} }
public void receiveExpiryAck(ContactId c, ExpiryAck a) throws DbException {
contactLock.readLock().lock();
try {
expiryLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setExpiryUpdateAcked(txn, c, a.getVersionNumber());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
expiryLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveExpiryUpdate(ContactId c, ExpiryUpdate u)
throws DbException {
contactLock.readLock().lock();
try {
expiryLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setExpiryTime(txn, c, u.getExpiryTime(),
u.getVersionNumber());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
expiryLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveMessage(ContactId c, Message m) throws DbException { public void receiveMessage(ContactId c, Message m) throws DbException {
boolean added = false; boolean added = false;
contactLock.readLock().lock(); contactLock.readLock().lock();
@@ -1226,6 +1179,55 @@ DatabaseCleaner.Callback {
return new Request(request, offered.size()); return new Request(request, offered.size());
} }
public void receiveRetentionAck(ContactId c, RetentionAck a)
throws DbException {
contactLock.readLock().lock();
try {
retentionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setRetentionUpdateAcked(txn, c, a.getVersionNumber());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
retentionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveRetentionUpdate(ContactId c, RetentionUpdate u)
throws DbException {
contactLock.readLock().lock();
try {
retentionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setRetentionTime(txn, c, u.getRetentionTime(),
u.getVersionNumber());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
retentionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveSubscriptionAck(ContactId c, SubscriptionAck a) public void receiveSubscriptionAck(ContactId c, SubscriptionAck a)
throws DbException { throws DbException {
contactLock.readLock().lock(); contactLock.readLock().lock();
@@ -1606,9 +1608,9 @@ DatabaseCleaner.Callback {
boolean removed = false; boolean removed = false;
contactLock.readLock().lock(); contactLock.readLock().lock();
try { try {
expiryLock.writeLock().lock(); messageLock.writeLock().lock();
try { try {
messageLock.writeLock().lock(); retentionLock.writeLock().lock();
try { try {
T txn = db.startTransaction(); T txn = db.startTransaction();
try { try {
@@ -1616,7 +1618,7 @@ DatabaseCleaner.Callback {
db.getOldMessages(txn, size); db.getOldMessages(txn, size);
if(!old.isEmpty()) { if(!old.isEmpty()) {
for(MessageId m : old) removeMessage(txn, m); for(MessageId m : old) removeMessage(txn, m);
db.incrementExpiryVersions(txn); db.incrementRetentionVersions(txn);
removed = true; removed = true;
} }
db.commitTransaction(txn); db.commitTransaction(txn);
@@ -1625,10 +1627,10 @@ DatabaseCleaner.Callback {
throw e; throw e;
} }
} finally { } finally {
messageLock.writeLock().unlock(); retentionLock.writeLock().unlock();
} }
} finally { } finally {
expiryLock.writeLock().unlock(); messageLock.writeLock().unlock();
} }
} finally { } finally {
contactLock.readLock().unlock(); contactLock.readLock().unlock();

View File

@@ -40,7 +40,7 @@ interface DatabaseConstants {
* The timestamp of the oldest message in the database is rounded using * The timestamp of the oldest message in the database is rounded using
* this modulus to avoid revealing the presence of any particular message. * this modulus to avoid revealing the presence of any particular message.
*/ */
long EXPIRY_MODULUS = 60L * 60L * 1000L; // 1 hour long RETENTION_MODULUS = 60L * 60L * 1000L; // 1 hour
/** /**
* The time in milliseconds after which a subscription or transport update * The time in milliseconds after which a subscription or transport update

View File

@@ -3,7 +3,7 @@ package net.sf.briar.db;
import static java.sql.Types.BINARY; import static java.sql.Types.BINARY;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import static net.sf.briar.db.DatabaseConstants.EXPIRY_MODULUS; import static net.sf.briar.db.DatabaseConstants.RETENTION_MODULUS;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@@ -31,8 +31,8 @@ import net.sf.briar.api.db.DbClosedException;
import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageHeader; import net.sf.briar.api.db.MessageHeader;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.ExpiryAck; import net.sf.briar.api.protocol.RetentionAck;
import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.RetentionUpdate;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -58,20 +58,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " (contactId COUNTER," + " (contactId COUNTER,"
+ " PRIMARY KEY (contactId))"; + " PRIMARY KEY (contactId))";
// Locking: expiry
private static final String CREATE_EXPIRY_VERSIONS =
"CREATE TABLE expiryVersions"
+ " (contactId INT NOT NULL,"
+ " expiry BIGINT NOT NULL,"
+ " localVersion BIGINT NOT NULL,"
+ " localAcked BIGINT NOT NULL,"
+ " remoteVersion BIGINT NOT NULL,"
+ " remoteAcked BOOLEAN NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: message // Locking: message
private static final String CREATE_MESSAGES = private static final String CREATE_MESSAGES =
"CREATE TABLE messages" "CREATE TABLE messages"
@@ -155,6 +141,20 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " rating SMALLINT NOT NULL," + " rating SMALLINT NOT NULL,"
+ " PRIMARY KEY (authorId))"; + " PRIMARY KEY (authorId))";
// Locking: contact read, retention
private static final String CREATE_RETENTION_VERSIONS =
"CREATE TABLE retentionVersions"
+ " (contactId INT NOT NULL,"
+ " retention BIGINT NOT NULL,"
+ " localVersion BIGINT NOT NULL,"
+ " localAcked BIGINT NOT NULL,"
+ " remoteVersion BIGINT NOT NULL,"
+ " remoteAcked BOOLEAN NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: subscription // Locking: subscription
private static final String CREATE_GROUPS = private static final String CREATE_GROUPS =
"CREATE TABLE groups" "CREATE TABLE groups"
@@ -355,7 +355,6 @@ abstract class JdbcDatabase implements Database<Connection> {
try { try {
s = txn.createStatement(); s = txn.createStatement();
s.executeUpdate(insertTypeNames(CREATE_CONTACTS)); s.executeUpdate(insertTypeNames(CREATE_CONTACTS));
s.executeUpdate(insertTypeNames(CREATE_EXPIRY_VERSIONS));
s.executeUpdate(insertTypeNames(CREATE_MESSAGES)); s.executeUpdate(insertTypeNames(CREATE_MESSAGES));
s.executeUpdate(INDEX_MESSAGES_BY_PARENT); s.executeUpdate(INDEX_MESSAGES_BY_PARENT);
s.executeUpdate(INDEX_MESSAGES_BY_AUTHOR); s.executeUpdate(INDEX_MESSAGES_BY_AUTHOR);
@@ -367,6 +366,7 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(INDEX_STATUSES_BY_CONTACT); s.executeUpdate(INDEX_STATUSES_BY_CONTACT);
s.executeUpdate(insertTypeNames(CREATE_FLAGS)); s.executeUpdate(insertTypeNames(CREATE_FLAGS));
s.executeUpdate(insertTypeNames(CREATE_RATINGS)); s.executeUpdate(insertTypeNames(CREATE_RATINGS));
s.executeUpdate(insertTypeNames(CREATE_RETENTION_VERSIONS));
s.executeUpdate(insertTypeNames(CREATE_GROUPS)); s.executeUpdate(insertTypeNames(CREATE_GROUPS));
s.executeUpdate(insertTypeNames(CREATE_GROUP_VISIBILITIES)); s.executeUpdate(insertTypeNames(CREATE_GROUP_VISIBILITIES));
s.executeUpdate(insertTypeNames(CREATE_CONTACT_GROUPS)); s.executeUpdate(insertTypeNames(CREATE_CONTACT_GROUPS));
@@ -513,8 +513,8 @@ abstract class JdbcDatabase implements Database<Connection> {
if(rs.next()) throw new DbStateException(); if(rs.next()) throw new DbStateException();
rs.close(); rs.close();
ps.close(); ps.close();
// Create an expiry version row // Create a retention version row
sql = "INSERT INTO expiryVersions (contactId, expiry," sql = "INSERT INTO retentionVersions (contactId, retention,"
+ " localVersion, localAcked, remoteVersion, remoteAcked)" + " localVersion, localAcked, remoteVersion, remoteAcked)"
+ " VALUES (?, ZERO(), ?, ZERO(), ZERO(), TRUE)"; + " VALUES (?, ZERO(), ?, ZERO(), ZERO(), TRUE)";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
@@ -1042,72 +1042,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} else return f.length(); } else return f.length();
} }
public ExpiryAck getExpiryAck(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT remoteVersion FROM expiryVersions"
+ " WHERE contactId = ? AND remoteAcked = FALSE";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if(!rs.next()) {
rs.close();
ps.close();
return null;
}
long version = rs.getLong(1);
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
sql = "UPDATE expiryVersions SET remoteAcked = TRUE"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
return new ExpiryAck(version);
} catch(SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public ExpiryUpdate getExpiryUpdate(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT timestamp, localVersion"
+ " FROM messages JOIN expiryVersions"
+ " WHERE contactId = ? AND localVersion > localAcked"
+ " ORDER BY timestamp LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, 1);
rs = ps.executeQuery();
if(!rs.next()) {
rs.close();
ps.close();
return null;
}
long expiry = rs.getLong(1);
expiry -= expiry % EXPIRY_MODULUS;
long version = rs.getLong(2);
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
return new ExpiryUpdate(expiry, version);
} catch(SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public MessageId getGroupMessageParent(Connection txn, MessageId m) public MessageId getGroupMessageParent(Connection txn, MessageId m)
throws DbException { throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
@@ -1277,14 +1211,14 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN expiryVersions AS ev" + " JOIN retentionVersions AS rv"
+ " ON cg.contactId = ev.contactId" + " ON cg.contactId = rv.contactId"
+ " JOIN statuses AS s" + " JOIN statuses AS s"
+ " ON m.messageId = s.messageId" + " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId" + " AND cg.contactId = s.contactId"
+ " WHERE m.messageId = ?" + " WHERE m.messageId = ?"
+ " AND cg.contactId = ?" + " AND cg.contactId = ?"
+ " AND timestamp >= expiry" + " AND timestamp >= retention"
+ " AND status = ?" + " AND status = ?"
+ " AND sendability > ZERO()"; + " AND sendability > ZERO()";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
@@ -1382,13 +1316,13 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN expiryVersions AS ev" + " JOIN retentionVersions AS rv"
+ " ON cg.contactId = ev.contactId" + " ON cg.contactId = rv.contactId"
+ " JOIN statuses AS s" + " JOIN statuses AS s"
+ " ON m.messageId = s.messageId" + " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId" + " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?" + " WHERE cg.contactId = ?"
+ " AND timestamp >= expiry" + " AND timestamp >= retention"
+ " AND status = ?" + " AND status = ?"
+ " AND sendability > ZERO()" + " AND sendability > ZERO()"
+ " ORDER BY timestamp DESC LIMIT ?"; + " ORDER BY timestamp DESC LIMIT ?";
@@ -1548,6 +1482,72 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public RetentionAck getRetentionAck(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT remoteVersion FROM retentionVersions"
+ " WHERE contactId = ? AND remoteAcked = FALSE";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if(!rs.next()) {
rs.close();
ps.close();
return null;
}
long version = rs.getLong(1);
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
sql = "UPDATE retentionVersions SET remoteAcked = TRUE"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
return new RetentionAck(version);
} catch(SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public RetentionUpdate getRetentionUpdate(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT timestamp, localVersion"
+ " FROM messages JOIN retentionVersions"
+ " WHERE contactId = ? AND localVersion > localAcked"
+ " ORDER BY timestamp LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, 1);
rs = ps.executeQuery();
if(!rs.next()) {
rs.close();
ps.close();
return null;
}
long retention = rs.getLong(1);
retention -= retention % RETENTION_MODULUS;
long version = rs.getLong(2);
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
return new RetentionUpdate(retention, version);
} catch(SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public Collection<TemporarySecret> getSecrets(Connection txn) public Collection<TemporarySecret> getSecrets(Connection txn)
throws DbException { throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
@@ -1642,13 +1642,13 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN expiryVersions AS ev" + " JOIN retentionVersions AS rv"
+ " ON cg.contactId = ev.contactId" + " ON cg.contactId = rv.contactId"
+ " JOIN statuses AS s" + " JOIN statuses AS s"
+ " ON m.messageId = s.messageId" + " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId" + " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?" + " WHERE cg.contactId = ?"
+ " AND timestamp >= expiry" + " AND timestamp >= retention"
+ " AND status = ?" + " AND status = ?"
+ " AND sendability > ZERO()" + " AND sendability > ZERO()"
+ " ORDER BY timestamp DESC"; + " ORDER BY timestamp DESC";
@@ -1974,13 +1974,13 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN expiryVersions AS ev" + " JOIN retentionVersios AS rv"
+ " ON cg.contactId = ev.contactId" + " ON cg.contactId = rv.contactId"
+ " JOIN statuses AS s" + " JOIN statuses AS s"
+ " ON m.messageId = s.messageId" + " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId" + " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?" + " WHERE cg.contactId = ?"
+ " AND timestamp >= expiry" + " AND timestamp >= retention"
+ " AND status = ?" + " AND status = ?"
+ " AND sendability > ZERO()" + " AND sendability > ZERO()"
+ " LIMIT ?"; + " LIMIT ?";
@@ -2042,10 +2042,10 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public void incrementExpiryVersions(Connection txn) throws DbException { public void incrementRetentionVersions(Connection txn) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
try { try {
String sql = "UPDATE expiryVersions" String sql = "UPDATE retentionVersions"
+ " SET localVersion = localVersion + ?"; + " SET localVersion = localVersion + ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setInt(1, 1); ps.setInt(1, 1);
@@ -2323,15 +2323,15 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public void setExpiryTime(Connection txn, ContactId c, long expiry, public void setRetentionTime(Connection txn, ContactId c, long retention,
long version) throws DbException { long version) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
try { try {
String sql = "UPDATE expiryVersions" String sql = "UPDATE retentionVersions SET retention = ?,"
+ " SET expiry = ?, remoteVersion = ?, remoteAcked = FALSE" + " remoteVersion = ?, remoteAcked = FALSE"
+ " WHERE contactId = ? AND remoteVersion < ?"; + " WHERE contactId = ? AND remoteVersion < ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setLong(1, expiry); ps.setLong(1, retention);
ps.setLong(2, version); ps.setLong(2, version);
ps.setInt(3, c.getInt()); ps.setInt(3, c.getInt());
ps.setLong(4, version); ps.setLong(4, version);
@@ -2344,11 +2344,11 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public void setExpiryUpdateAcked(Connection txn, ContactId c, long version) public void setRetentionUpdateAcked(Connection txn, ContactId c,
throws DbException { long version) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
try { try {
String sql = "UPDATE expiryVersions SET localAcked = ?" String sql = "UPDATE retentionVersions SET localAcked = ?"
+ " WHERE contactId = ?" + " WHERE contactId = ?"
+ " AND localAcked < ? AND localVersion >= ?"; + " AND localAcked < ? AND localVersion >= ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
@@ -2672,11 +2672,11 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv" + " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId" + " ON m.groupId = gv.groupId"
+ " AND cg.contactId = gv.contactId" + " AND cg.contactId = gv.contactId"
+ " JOIN expiryVersions AS ev" + " JOIN retentionVersions AS rv"
+ " ON cg.contactId = ev.contactId" + " ON cg.contactId = rv.contactId"
+ " WHERE messageId = ?" + " WHERE messageId = ?"
+ " AND cg.contactId = ?" + " AND cg.contactId = ?"
+ " AND timestamp >= expiry"; + " AND timestamp >= retention";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes()); ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt()); ps.setInt(2, c.getInt());

View File

@@ -4,8 +4,8 @@ import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PROPERTIES_PER_TRANSPORT; import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PROPERTIES_PER_TRANSPORT;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PROPERTY_LENGTH; import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PROPERTY_LENGTH;
import static net.sf.briar.api.protocol.Types.ACK; import static net.sf.briar.api.protocol.Types.ACK;
import static net.sf.briar.api.protocol.Types.EXPIRY_ACK; import static net.sf.briar.api.protocol.Types.RETENTION_ACK;
import static net.sf.briar.api.protocol.Types.EXPIRY_UPDATE; import static net.sf.briar.api.protocol.Types.RETENTION_UPDATE;
import static net.sf.briar.api.protocol.Types.MESSAGE; import static net.sf.briar.api.protocol.Types.MESSAGE;
import static net.sf.briar.api.protocol.Types.OFFER; import static net.sf.briar.api.protocol.Types.OFFER;
import static net.sf.briar.api.protocol.Types.REQUEST; import static net.sf.briar.api.protocol.Types.REQUEST;
@@ -26,8 +26,8 @@ import net.sf.briar.api.Bytes;
import net.sf.briar.api.FormatException; import net.sf.briar.api.FormatException;
import net.sf.briar.api.TransportProperties; import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.ExpiryAck; import net.sf.briar.api.protocol.RetentionAck;
import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.RetentionUpdate;
import net.sf.briar.api.protocol.MessageId; import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer; import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolReader; import net.sf.briar.api.protocol.ProtocolReader;
@@ -90,30 +90,6 @@ class ProtocolReaderImpl implements ProtocolReader {
return new Ack(Collections.unmodifiableList(acked)); return new Ack(Collections.unmodifiableList(acked));
} }
public boolean hasExpiryAck() throws IOException {
return r.hasStruct(EXPIRY_ACK);
}
public ExpiryAck readExpiryAck() throws IOException {
r.readStructId(EXPIRY_ACK);
long version = r.readInt64();
if(version < 0L) throw new FormatException();
return new ExpiryAck(version);
}
public boolean hasExpiryUpdate() throws IOException {
return r.hasStruct(EXPIRY_UPDATE);
}
public ExpiryUpdate readExpiryUpdate() throws IOException {
r.readStructId(EXPIRY_UPDATE);
long expiry = r.readInt64();
if(expiry < 0L) throw new FormatException();
long version = r.readInt64();
if(version < 0L) throw new FormatException();
return new ExpiryUpdate(expiry, version);
}
public boolean hasMessage() throws IOException { public boolean hasMessage() throws IOException {
return r.hasStruct(MESSAGE); return r.hasStruct(MESSAGE);
} }
@@ -173,6 +149,30 @@ class ProtocolReaderImpl implements ProtocolReader {
return new Request(b, length); return new Request(b, length);
} }
public boolean hasRetentionAck() throws IOException {
return r.hasStruct(RETENTION_ACK);
}
public RetentionAck readRetentionAck() throws IOException {
r.readStructId(RETENTION_ACK);
long version = r.readInt64();
if(version < 0L) throw new FormatException();
return new RetentionAck(version);
}
public boolean hasRetentionUpdate() throws IOException {
return r.hasStruct(RETENTION_UPDATE);
}
public RetentionUpdate readRetentionUpdate() throws IOException {
r.readStructId(RETENTION_UPDATE);
long retention = r.readInt64();
if(retention < 0L) throw new FormatException();
long version = r.readInt64();
if(version < 0L) throw new FormatException();
return new RetentionUpdate(retention, version);
}
public boolean hasSubscriptionAck() throws IOException { public boolean hasSubscriptionAck() throws IOException {
return r.hasStruct(SUBSCRIPTION_ACK); return r.hasStruct(SUBSCRIPTION_ACK);
} }

View File

@@ -2,8 +2,8 @@ package net.sf.briar.protocol;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH; import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import static net.sf.briar.api.protocol.Types.ACK; import static net.sf.briar.api.protocol.Types.ACK;
import static net.sf.briar.api.protocol.Types.EXPIRY_ACK; import static net.sf.briar.api.protocol.Types.RETENTION_ACK;
import static net.sf.briar.api.protocol.Types.EXPIRY_UPDATE; import static net.sf.briar.api.protocol.Types.RETENTION_UPDATE;
import static net.sf.briar.api.protocol.Types.GROUP; import static net.sf.briar.api.protocol.Types.GROUP;
import static net.sf.briar.api.protocol.Types.OFFER; import static net.sf.briar.api.protocol.Types.OFFER;
import static net.sf.briar.api.protocol.Types.REQUEST; import static net.sf.briar.api.protocol.Types.REQUEST;
@@ -17,8 +17,8 @@ import java.io.OutputStream;
import java.util.BitSet; import java.util.BitSet;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.ExpiryAck; import net.sf.briar.api.protocol.RetentionAck;
import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.RetentionUpdate;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.MessageId; import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer; import net.sf.briar.api.protocol.Offer;
@@ -74,19 +74,6 @@ class ProtocolWriterImpl implements ProtocolWriter {
if(flush) out.flush(); if(flush) out.flush();
} }
public void writeExpiryAck(ExpiryAck a) throws IOException {
w.writeStructId(EXPIRY_ACK);
w.writeInt64(a.getVersionNumber());
if(flush) out.flush();
}
public void writeExpiryUpdate(ExpiryUpdate e) throws IOException {
w.writeStructId(EXPIRY_UPDATE);
w.writeInt64(e.getExpiryTime());
w.writeInt64(e.getVersionNumber());
if(flush) out.flush();
}
public void writeMessage(byte[] raw) throws IOException { public void writeMessage(byte[] raw) throws IOException {
out.write(raw); out.write(raw);
if(flush) out.flush(); if(flush) out.flush();
@@ -120,6 +107,19 @@ class ProtocolWriterImpl implements ProtocolWriter {
if(flush) out.flush(); if(flush) out.flush();
} }
public void writeRetentionAck(RetentionAck a) throws IOException {
w.writeStructId(RETENTION_ACK);
w.writeInt64(a.getVersionNumber());
if(flush) out.flush();
}
public void writeRetentionUpdate(RetentionUpdate u) throws IOException {
w.writeStructId(RETENTION_UPDATE);
w.writeInt64(u.getRetentionTime());
w.writeInt64(u.getVersionNumber());
if(flush) out.flush();
}
public void writeSubscriptionAck(SubscriptionAck a) throws IOException { public void writeSubscriptionAck(SubscriptionAck a) throws IOException {
w.writeStructId(SUBSCRIPTION_ACK); w.writeStructId(SUBSCRIPTION_ACK);
w.writeInt64(a.getVersionNumber()); w.writeInt64(a.getVersionNumber());