Database portion of the offer/request/transfer protocol (untested).

This commit is contained in:
akwizgran
2011-07-26 15:40:34 +01:00
parent a86ef2142f
commit 10edc05dff
9 changed files with 436 additions and 19 deletions

View File

@@ -12,10 +12,14 @@ import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.Subscriptions; import net.sf.briar.api.protocol.Subscriptions;
import net.sf.briar.api.protocol.Transports; import net.sf.briar.api.protocol.Transports;
import net.sf.briar.api.protocol.writers.AckWriter; import net.sf.briar.api.protocol.writers.AckWriter;
import net.sf.briar.api.protocol.writers.BatchWriter; import net.sf.briar.api.protocol.writers.BatchWriter;
import net.sf.briar.api.protocol.writers.OfferWriter;
import net.sf.briar.api.protocol.writers.RequestWriter;
import net.sf.briar.api.protocol.writers.SubscriptionWriter; import net.sf.briar.api.protocol.writers.SubscriptionWriter;
import net.sf.briar.api.protocol.writers.TransportWriter; import net.sf.briar.api.protocol.writers.TransportWriter;
@@ -25,14 +29,14 @@ import net.sf.briar.api.protocol.writers.TransportWriter;
*/ */
public interface DatabaseComponent { public interface DatabaseComponent {
static final long MEGABYTES = 1024L * 1024L; static final int MEGABYTES = 1024 * 1024;
// FIXME: These should be configurable // FIXME: These should be configurable
static final long MIN_FREE_SPACE = 300L * MEGABYTES; static final long MIN_FREE_SPACE = 300L * MEGABYTES;
static final long CRITICAL_FREE_SPACE = 100L * MEGABYTES; static final long CRITICAL_FREE_SPACE = 100L * MEGABYTES;
static final long MAX_BYTES_BETWEEN_SPACE_CHECKS = 5L * MEGABYTES; static final int MAX_BYTES_BETWEEN_SPACE_CHECKS = 5 * MEGABYTES;
static final long MAX_MS_BETWEEN_SPACE_CHECKS = 60L * 1000L; // 1 min static final long MAX_MS_BETWEEN_SPACE_CHECKS = 60L * 1000L; // 1 min
static final long BYTES_PER_SWEEP = 5L * MEGABYTES; static final int BYTES_PER_SWEEP = 5 * MEGABYTES;
/** /**
* Opens the database. * Opens the database.
@@ -67,6 +71,21 @@ public interface DatabaseComponent {
void generateBatch(ContactId c, BatchWriter b) throws DbException, void generateBatch(ContactId c, BatchWriter b) throws DbException,
IOException; IOException;
/**
* Generates a batch of messages for the given contact from the given
* collection of requested messages, and returns the IDs of the messages
* added to the bacth.
*/
Collection<MessageId> generateBatch(ContactId c, BatchWriter b,
Collection<MessageId> requested) throws DbException, IOException;
/**
* Generates an offer for the given contact and returns the offered
* message IDs.
*/
Collection<MessageId> generateOffer(ContactId c, OfferWriter h)
throws DbException, IOException;
/** Generates a subscription update for the given contact. */ /** Generates a subscription update for the given contact. */
void generateSubscriptions(ContactId c, SubscriptionWriter s) throws void generateSubscriptions(ContactId c, SubscriptionWriter s) throws
DbException, IOException; DbException, IOException;
@@ -96,6 +115,13 @@ public interface DatabaseComponent {
/** Processes a batches of messages from the given contact. */ /** Processes a batches of messages from the given contact. */
void receiveBatch(ContactId c, Batch b) throws DbException; void receiveBatch(ContactId c, Batch b) throws DbException;
/**
* Processes an offer from the given contact and generates a request for
* any messages in the offer that have not been seen.
*/
void receiveOffer(ContactId c, Offer o, RequestWriter r) throws DbException,
IOException;
/** Processes a subscription update from the given contact. */ /** Processes a subscription update from the given contact. */
void receiveSubscriptions(ContactId c, Subscriptions s) throws DbException; void receiveSubscriptions(ContactId c, Subscriptions s) throws DbException;

View File

@@ -0,0 +1,16 @@
package net.sf.briar.api.protocol;
import java.util.Collection;
/** A packet offering the recipient some messages. */
public interface Offer {
/**
* The maximum size of a serialised offer, excluding encryption and
* authentication.
*/
static final int MAX_SIZE = (1024 * 1024) - 100;
/** Returns the message IDs contained in the offer. */
Collection<MessageId> getMessages();
}

View File

@@ -0,0 +1,18 @@
package net.sf.briar.api.protocol.writers;
import java.io.IOException;
import net.sf.briar.api.protocol.MessageId;
/** An interface for creating a have notification. */
public interface OfferWriter {
/**
* Attempts to add the given message ID to the offer and returns true if it
* was added.
*/
boolean writeMessageId(MessageId m) throws IOException;
/** Finishes writing the offer. */
void finish() throws IOException;
}

View File

@@ -0,0 +1,11 @@
package net.sf.briar.api.protocol.writers;
import java.io.IOException;
import java.util.BitSet;
/** An interface for creating a request packet. */
public interface RequestWriter {
/** Writes the contents of the request. */
void writeBitmap(BitSet b) throws IOException;
}

View File

@@ -175,6 +175,16 @@ interface Database<T> {
*/ */
byte[] getMessage(T txn, MessageId m) throws DbException; byte[] getMessage(T txn, MessageId m) throws DbException;
/**
* Returns the message identified by the given ID, in raw format, or null
* if the message is not present in the database or is not sendable to the
* given contact.
* <p>
* Locking: contacts read, messages read, messageStatuses read.
*/
byte[] getMessageIfSendable(T txn, ContactId c, MessageId m)
throws DbException;
/** /**
* Returns the IDs of all messages signed by the given author. * Returns the IDs of all messages signed by the given author.
* <p> * <p>
@@ -197,7 +207,7 @@ interface Database<T> {
* <p> * <p>
* Locking: messages read. * Locking: messages read.
*/ */
Collection<MessageId> getOldMessages(T txn, long size) throws DbException; Collection<MessageId> getOldMessages(T txn, int size) throws DbException;
/** /**
* Returns the parent of the given message. * Returns the parent of the given message.
@@ -228,7 +238,7 @@ interface Database<T> {
* <p> * <p>
* Locking: contacts read, messages read, messageStatuses read. * Locking: contacts read, messages read, messageStatuses read.
*/ */
Collection<MessageId> getSendableMessages(T txn, ContactId c, int capacity) Collection<MessageId> getSendableMessages(T txn, ContactId c, int size)
throws DbException; throws DbException;
/** /**
@@ -271,6 +281,8 @@ interface Database<T> {
/** /**
* Marks the given batches received from the given contact as having been * Marks the given batches received from the given contact as having been
* acknowledged. * acknowledged.
* <p>
* Locking: contacts read, messageStatuses write.
*/ */
void removeBatchesToAck(T txn, ContactId c, Collection<BatchId> sent) void removeBatchesToAck(T txn, ContactId c, Collection<BatchId> sent)
throws DbException; throws DbException;
@@ -330,6 +342,18 @@ interface Database<T> {
void setStatus(T txn, ContactId c, MessageId m, Status s) void setStatus(T txn, ContactId c, MessageId m, Status s)
throws DbException; throws DbException;
/**
* If the database contains the given message and it belongs to a group
* that is visible to the given contact, sets the status of the message
* with respect to the contact to Status.SEEN and returns true; otherwise
* returns false.
* <p>
* Locking: contacts read, messages read, messageStatuses write,
* subscriptions read.
*/
boolean setStatusSeenIfVisible(T txn, ContactId c, MessageId m)
throws DbException;
/** /**
* Sets the subscriptions for the given contact, replacing any existing * Sets the subscriptions for the given contact, replacing any existing
* subscriptions unless the existing subscriptions have a newer timestamp. * subscriptions unless the existing subscriptions have a newer timestamp.

View File

@@ -45,7 +45,7 @@ DatabaseCleaner.Callback {
* Removes the oldest messages from the database, with a total size less * Removes the oldest messages from the database, with a total size less
* than or equal to the given size. * than or equal to the given size.
*/ */
protected abstract void expireMessages(long size) throws DbException; protected abstract void expireMessages(int size) throws DbException;
/** /**
* Calculates and returns the sendability score of a message. * Calculates and returns the sendability score of a message.

View File

@@ -751,6 +751,45 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public byte[] getMessageIfSendable(Connection txn, ContactId c, MessageId m)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT size, raw FROM messages"
+ " JOIN contactSubscriptions"
+ " ON messages.groupId = contactSubscriptions.groupId"
+ " JOIN statuses ON messages.messageId = statuses.messageId"
+ " WHERE messages.messageId = ?"
+ " AND contactSubscriptions.contactId = ?"
+ " AND statuses.contactId = ? AND status = ?"
+ " AND sendability > ZERO()";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
ps.setInt(3, c.getInt());
ps.setShort(4, (short) Status.NEW.ordinal());
rs = ps.executeQuery();
byte[] raw = null;
if(rs.next()) {
int size = rs.getInt(1);
Blob b = rs.getBlob(2);
raw = b.getBytes(1, size);
assert raw.length == size;
}
boolean more = rs.next();
assert !more;
rs.close();
ps.close();
return raw;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
public Collection<MessageId> getMessagesByAuthor(Connection txn, AuthorId a) public Collection<MessageId> getMessagesByAuthor(Connection txn, AuthorId a)
throws DbException { throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
@@ -836,7 +875,7 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public Collection<MessageId> getOldMessages(Connection txn, long capacity) public Collection<MessageId> getOldMessages(Connection txn, int capacity)
throws DbException { throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
@@ -846,7 +885,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
rs = ps.executeQuery(); rs = ps.executeQuery();
Collection<MessageId> ids = new ArrayList<MessageId>(); Collection<MessageId> ids = new ArrayList<MessageId>();
long total = 0L; int total = 0;
while(rs.next()) { while(rs.next()) {
int size = rs.getInt(1); int size = rs.getInt(1);
if(total + size > capacity) break; if(total + size > capacity) break;
@@ -1356,6 +1395,44 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public boolean setStatusSeenIfVisible(Connection txn, ContactId c,
MessageId m) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT COUNT(messages.messageId) FROM messages"
+ " JOIN contactSubscriptions"
+ " ON messages.groupId = contactSubscriptions.groupId"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
boolean found = rs.next();
assert found;
int count = rs.getInt(1);
assert count <= 1;
boolean more = rs.next();
assert !more;
rs.close();
ps.close();
if(count == 0) return false;
sql = "UPDATE statuses SET status = ?"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setShort(1, (short) Status.SEEN.ordinal());
ps.setBytes(2, m.getBytes());
ps.setInt(3, c.getInt());
int rowsAffected = ps.executeUpdate();
assert rowsAffected <= 1;
ps.close();
return true;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
public void setSubscriptions(Connection txn, ContactId c, public void setSubscriptions(Connection txn, ContactId c,
Collection<Group> subs, long timestamp) throws DbException { Collection<Group> subs, long timestamp) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;

View File

@@ -2,11 +2,10 @@ package net.sf.briar.db;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -23,10 +22,13 @@ import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId; import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.Subscriptions; import net.sf.briar.api.protocol.Subscriptions;
import net.sf.briar.api.protocol.Transports; import net.sf.briar.api.protocol.Transports;
import net.sf.briar.api.protocol.writers.AckWriter; import net.sf.briar.api.protocol.writers.AckWriter;
import net.sf.briar.api.protocol.writers.BatchWriter; import net.sf.briar.api.protocol.writers.BatchWriter;
import net.sf.briar.api.protocol.writers.OfferWriter;
import net.sf.briar.api.protocol.writers.RequestWriter;
import net.sf.briar.api.protocol.writers.SubscriptionWriter; import net.sf.briar.api.protocol.writers.SubscriptionWriter;
import net.sf.briar.api.protocol.writers.TransportWriter; import net.sf.briar.api.protocol.writers.TransportWriter;
@@ -64,7 +66,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
super(db, cleaner); super(db, cleaner);
} }
protected void expireMessages(long size) throws DbException { protected void expireMessages(int size) throws DbException {
contactLock.readLock().lock(); contactLock.readLock().lock();
try { try {
messageLock.writeLock().lock(); messageLock.writeLock().lock();
@@ -287,7 +289,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
if(!containsContact(c)) throw new NoSuchContactException(); if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock(); messageLock.readLock().lock();
try { try {
Set<MessageId> sent; Collection<MessageId> sent;
int bytesSent = 0; int bytesSent = 0;
messageStatusLock.readLock().lock(); messageStatusLock.readLock().lock();
try { try {
@@ -296,7 +298,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
int capacity = b.getCapacity(); int capacity = b.getCapacity();
Iterator<MessageId> it = Iterator<MessageId> it =
db.getSendableMessages(txn, c, capacity).iterator(); db.getSendableMessages(txn, c, capacity).iterator();
sent = new HashSet<MessageId>(); sent = new ArrayList<MessageId>();
while(it.hasNext()) { while(it.hasNext()) {
MessageId m = it.next(); MessageId m = it.next();
byte[] message = db.getMessage(txn, m); byte[] message = db.getMessage(txn, m);
@@ -316,7 +318,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
messageStatusLock.readLock().unlock(); messageStatusLock.readLock().unlock();
} }
BatchId id = b.finish(); BatchId id = b.finish();
// Record the contents of the batch, unless it was empty // Record the contents of the batch, unless it's empty
if(sent.isEmpty()) return; if(sent.isEmpty()) return;
messageStatusLock.writeLock().lock(); messageStatusLock.writeLock().lock();
try { try {
@@ -339,6 +341,104 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} }
} }
public Collection<MessageId> generateBatch(ContactId c, BatchWriter b,
Collection<MessageId> requested) throws DbException, IOException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
Collection<MessageId> sent;
messageStatusLock.readLock().lock();
try{
Txn txn = db.startTransaction();
try {
sent = new ArrayList<MessageId>();
int bytesSent = 0;
for(MessageId m : requested) {
byte[] message = db.getMessageIfSendable(txn, c, m);
if(b == null) continue; // Expired or not sendable
if(!b.writeMessage(message)) break;
bytesSent += message.length;
sent.add(m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.readLock().unlock();
}
BatchId id = b.finish();
// Record the contents of the batch, unless it's empty
if(sent.isEmpty()) return sent;
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
return sent;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public Collection<MessageId> generateOffer(ContactId c, OfferWriter o)
throws DbException, IOException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
messageStatusLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Collection<MessageId> sendable =
db.getSendableMessages(txn, c, Integer.MAX_VALUE);
Iterator<MessageId> it = sendable.iterator();
Collection<MessageId> sent = new ArrayList<MessageId>();
while(it.hasNext()) {
MessageId m = it.next();
if(!o.writeMessageId(m)) break;
sent.add(m);
}
o.finish();
db.commitTransaction(txn);
return sent;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.readLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void generateSubscriptions(ContactId c, SubscriptionWriter s) public void generateSubscriptions(ContactId c, SubscriptionWriter s)
throws DbException, IOException { throws DbException, IOException {
contactLock.readLock().lock(); contactLock.readLock().lock();
@@ -565,6 +665,50 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} }
} }
public void receiveOffer(ContactId c, Offer o, RequestWriter r)
throws DbException, IOException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
try {
BitSet request;
Txn txn = db.startTransaction();
try {
Collection<MessageId> offered = o.getMessages();
request = new BitSet(offered.size());
Iterator<MessageId> it = offered.iterator();
for(int i = 0; it.hasNext(); i++) {
// If the message is not in the database or if
// it is not visible to the contact, request it
MessageId m = it.next();
if(!db.setStatusSeenIfVisible(txn, c, m))
request.set(i);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
r.writeBitmap(request);
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveSubscriptions(ContactId c, Subscriptions s) public void receiveSubscriptions(ContactId c, Subscriptions s)
throws DbException { throws DbException {
// Update the contact's subscriptions // Update the contact's subscriptions

View File

@@ -2,11 +2,10 @@ package net.sf.briar.db;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -22,10 +21,13 @@ import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId; import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.Subscriptions; import net.sf.briar.api.protocol.Subscriptions;
import net.sf.briar.api.protocol.Transports; import net.sf.briar.api.protocol.Transports;
import net.sf.briar.api.protocol.writers.AckWriter; import net.sf.briar.api.protocol.writers.AckWriter;
import net.sf.briar.api.protocol.writers.BatchWriter; import net.sf.briar.api.protocol.writers.BatchWriter;
import net.sf.briar.api.protocol.writers.OfferWriter;
import net.sf.briar.api.protocol.writers.RequestWriter;
import net.sf.briar.api.protocol.writers.SubscriptionWriter; import net.sf.briar.api.protocol.writers.SubscriptionWriter;
import net.sf.briar.api.protocol.writers.TransportWriter; import net.sf.briar.api.protocol.writers.TransportWriter;
@@ -57,7 +59,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
super(db, cleaner); super(db, cleaner);
} }
protected void expireMessages(long size) throws DbException { protected void expireMessages(int size) throws DbException {
synchronized(contactLock) { synchronized(contactLock) {
synchronized(messageLock) { synchronized(messageLock) {
synchronized(messageStatusLock) { synchronized(messageStatusLock) {
@@ -220,7 +222,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
int capacity = b.getCapacity(); int capacity = b.getCapacity();
Iterator<MessageId> it = Iterator<MessageId> it =
db.getSendableMessages(txn, c, capacity).iterator(); db.getSendableMessages(txn, c, capacity).iterator();
Set<MessageId> sent = new HashSet<MessageId>(); Collection<MessageId> sent = new ArrayList<MessageId>();
int bytesSent = 0; int bytesSent = 0;
while(it.hasNext()) { while(it.hasNext()) {
MessageId m = it.next(); MessageId m = it.next();
@@ -230,7 +232,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
sent.add(m); sent.add(m);
} }
BatchId id = b.finish(); BatchId id = b.finish();
// Record the contents of the batch, unless it was empty // Record the contents of the batch, unless it's empty
if(!sent.isEmpty()) if(!sent.isEmpty())
db.addOutstandingBatch(txn, c, id, sent); db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn); db.commitTransaction(txn);
@@ -246,6 +248,73 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} }
} }
public Collection<MessageId> generateBatch(ContactId c, BatchWriter b,
Collection<MessageId> requested) throws DbException, IOException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
Collection<MessageId> sent = new ArrayList<MessageId>();
int bytesSent = 0;
for(MessageId m : requested) {
byte[] message = db.getMessageIfSendable(txn, c, m);
if(b == null) continue; // Expired or not sendable
if(!b.writeMessage(message)) break;
bytesSent += message.length;
sent.add(m);
}
BatchId id = b.finish();
// Record the contents of the batch, unless it's empty
if(!sent.isEmpty())
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
return sent;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
public Collection<MessageId> generateOffer(ContactId c, OfferWriter o)
throws DbException, IOException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
Collection<MessageId> sendable =
db.getSendableMessages(txn, c, Integer.MAX_VALUE);
Iterator<MessageId> it = sendable.iterator();
Collection<MessageId> sent = new ArrayList<MessageId>();
while(it.hasNext()) {
MessageId m = it.next();
if(!o.writeMessageId(m)) break;
sent.add(m);
}
o.finish();
db.commitTransaction(txn);
return sent;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
public void generateSubscriptions(ContactId c, SubscriptionWriter s) public void generateSubscriptions(ContactId c, SubscriptionWriter s)
throws DbException, IOException { throws DbException, IOException {
synchronized(contactLock) { synchronized(contactLock) {
@@ -421,6 +490,38 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} }
} }
public void receiveOffer(ContactId c, Offer o, RequestWriter r)
throws DbException, IOException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
synchronized(subscriptionLock) {
BitSet request;
Txn txn = db.startTransaction();
try {
Collection<MessageId> offered = o.getMessages();
request = new BitSet(offered.size());
Iterator<MessageId> it = offered.iterator();
for(int i = 0; it.hasNext(); i++) {
// If the message is not in the database or if
// it is not visible to the contact, request it
MessageId m = it.next();
if(!db.setStatusSeenIfVisible(txn, c, m))
request.set(i);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
r.writeBitmap(request);
}
}
}
}
}
public void receiveSubscriptions(ContactId c, Subscriptions s) public void receiveSubscriptions(ContactId c, Subscriptions s)
throws DbException { throws DbException {
// Update the contact's subscriptions // Update the contact's subscriptions