diff --git a/api/net/sf/briar/api/db/DatabaseComponent.java b/api/net/sf/briar/api/db/DatabaseComponent.java index fffeb2dec..bb753b3fc 100644 --- a/api/net/sf/briar/api/db/DatabaseComponent.java +++ b/api/net/sf/briar/api/db/DatabaseComponent.java @@ -12,10 +12,14 @@ import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; +import net.sf.briar.api.protocol.MessageId; +import net.sf.briar.api.protocol.Offer; import net.sf.briar.api.protocol.Subscriptions; import net.sf.briar.api.protocol.Transports; import net.sf.briar.api.protocol.writers.AckWriter; import net.sf.briar.api.protocol.writers.BatchWriter; +import net.sf.briar.api.protocol.writers.OfferWriter; +import net.sf.briar.api.protocol.writers.RequestWriter; import net.sf.briar.api.protocol.writers.SubscriptionWriter; import net.sf.briar.api.protocol.writers.TransportWriter; @@ -25,14 +29,14 @@ import net.sf.briar.api.protocol.writers.TransportWriter; */ public interface DatabaseComponent { - static final long MEGABYTES = 1024L * 1024L; + static final int MEGABYTES = 1024 * 1024; // FIXME: These should be configurable static final long MIN_FREE_SPACE = 300L * MEGABYTES; static final long CRITICAL_FREE_SPACE = 100L * MEGABYTES; - static final long MAX_BYTES_BETWEEN_SPACE_CHECKS = 5L * MEGABYTES; + static final int MAX_BYTES_BETWEEN_SPACE_CHECKS = 5 * MEGABYTES; static final long MAX_MS_BETWEEN_SPACE_CHECKS = 60L * 1000L; // 1 min - static final long BYTES_PER_SWEEP = 5L * MEGABYTES; + static final int BYTES_PER_SWEEP = 5 * MEGABYTES; /** * Opens the database. @@ -67,6 +71,21 @@ public interface DatabaseComponent { void generateBatch(ContactId c, BatchWriter b) throws DbException, IOException; + /** + * Generates a batch of messages for the given contact from the given + * collection of requested messages, and returns the IDs of the messages + * added to the bacth. + */ + Collection generateBatch(ContactId c, BatchWriter b, + Collection requested) throws DbException, IOException; + + /** + * Generates an offer for the given contact and returns the offered + * message IDs. + */ + Collection generateOffer(ContactId c, OfferWriter h) + throws DbException, IOException; + /** Generates a subscription update for the given contact. */ void generateSubscriptions(ContactId c, SubscriptionWriter s) throws DbException, IOException; @@ -96,6 +115,13 @@ public interface DatabaseComponent { /** Processes a batches of messages from the given contact. */ void receiveBatch(ContactId c, Batch b) throws DbException; + /** + * Processes an offer from the given contact and generates a request for + * any messages in the offer that have not been seen. + */ + void receiveOffer(ContactId c, Offer o, RequestWriter r) throws DbException, + IOException; + /** Processes a subscription update from the given contact. */ void receiveSubscriptions(ContactId c, Subscriptions s) throws DbException; diff --git a/api/net/sf/briar/api/protocol/Offer.java b/api/net/sf/briar/api/protocol/Offer.java new file mode 100644 index 000000000..e65d07394 --- /dev/null +++ b/api/net/sf/briar/api/protocol/Offer.java @@ -0,0 +1,16 @@ +package net.sf.briar.api.protocol; + +import java.util.Collection; + +/** A packet offering the recipient some messages. */ +public interface Offer { + + /** + * The maximum size of a serialised offer, excluding encryption and + * authentication. + */ + static final int MAX_SIZE = (1024 * 1024) - 100; + + /** Returns the message IDs contained in the offer. */ + Collection getMessages(); +} diff --git a/api/net/sf/briar/api/protocol/writers/OfferWriter.java b/api/net/sf/briar/api/protocol/writers/OfferWriter.java new file mode 100644 index 000000000..5ad5ddd2e --- /dev/null +++ b/api/net/sf/briar/api/protocol/writers/OfferWriter.java @@ -0,0 +1,18 @@ +package net.sf.briar.api.protocol.writers; + +import java.io.IOException; + +import net.sf.briar.api.protocol.MessageId; + +/** An interface for creating a have notification. */ +public interface OfferWriter { + + /** + * Attempts to add the given message ID to the offer and returns true if it + * was added. + */ + boolean writeMessageId(MessageId m) throws IOException; + + /** Finishes writing the offer. */ + void finish() throws IOException; +} diff --git a/api/net/sf/briar/api/protocol/writers/RequestWriter.java b/api/net/sf/briar/api/protocol/writers/RequestWriter.java new file mode 100644 index 000000000..2dd52d2fb --- /dev/null +++ b/api/net/sf/briar/api/protocol/writers/RequestWriter.java @@ -0,0 +1,11 @@ +package net.sf.briar.api.protocol.writers; + +import java.io.IOException; +import java.util.BitSet; + +/** An interface for creating a request packet. */ +public interface RequestWriter { + + /** Writes the contents of the request. */ + void writeBitmap(BitSet b) throws IOException; +} diff --git a/components/net/sf/briar/db/Database.java b/components/net/sf/briar/db/Database.java index c408fe158..fcb55b8f8 100644 --- a/components/net/sf/briar/db/Database.java +++ b/components/net/sf/briar/db/Database.java @@ -175,6 +175,16 @@ interface Database { */ byte[] getMessage(T txn, MessageId m) throws DbException; + /** + * Returns the message identified by the given ID, in raw format, or null + * if the message is not present in the database or is not sendable to the + * given contact. + *

+ * Locking: contacts read, messages read, messageStatuses read. + */ + byte[] getMessageIfSendable(T txn, ContactId c, MessageId m) + throws DbException; + /** * Returns the IDs of all messages signed by the given author. *

@@ -197,7 +207,7 @@ interface Database { *

* Locking: messages read. */ - Collection getOldMessages(T txn, long size) throws DbException; + Collection getOldMessages(T txn, int size) throws DbException; /** * Returns the parent of the given message. @@ -228,7 +238,7 @@ interface Database { *

* Locking: contacts read, messages read, messageStatuses read. */ - Collection getSendableMessages(T txn, ContactId c, int capacity) + Collection getSendableMessages(T txn, ContactId c, int size) throws DbException; /** @@ -271,6 +281,8 @@ interface Database { /** * Marks the given batches received from the given contact as having been * acknowledged. + *

+ * Locking: contacts read, messageStatuses write. */ void removeBatchesToAck(T txn, ContactId c, Collection sent) throws DbException; @@ -330,6 +342,18 @@ interface Database { void setStatus(T txn, ContactId c, MessageId m, Status s) throws DbException; + /** + * If the database contains the given message and it belongs to a group + * that is visible to the given contact, sets the status of the message + * with respect to the contact to Status.SEEN and returns true; otherwise + * returns false. + *

+ * Locking: contacts read, messages read, messageStatuses write, + * subscriptions read. + */ + boolean setStatusSeenIfVisible(T txn, ContactId c, MessageId m) + throws DbException; + /** * Sets the subscriptions for the given contact, replacing any existing * subscriptions unless the existing subscriptions have a newer timestamp. diff --git a/components/net/sf/briar/db/DatabaseComponentImpl.java b/components/net/sf/briar/db/DatabaseComponentImpl.java index 45c006ecc..3d98dcc3a 100644 --- a/components/net/sf/briar/db/DatabaseComponentImpl.java +++ b/components/net/sf/briar/db/DatabaseComponentImpl.java @@ -45,7 +45,7 @@ DatabaseCleaner.Callback { * Removes the oldest messages from the database, with a total size less * than or equal to the given size. */ - protected abstract void expireMessages(long size) throws DbException; + protected abstract void expireMessages(int size) throws DbException; /** * Calculates and returns the sendability score of a message. diff --git a/components/net/sf/briar/db/JdbcDatabase.java b/components/net/sf/briar/db/JdbcDatabase.java index e7bf13d0b..93fd24bef 100644 --- a/components/net/sf/briar/db/JdbcDatabase.java +++ b/components/net/sf/briar/db/JdbcDatabase.java @@ -751,6 +751,45 @@ abstract class JdbcDatabase implements Database { } } + public byte[] getMessageIfSendable(Connection txn, ContactId c, MessageId m) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT size, raw FROM messages" + + " JOIN contactSubscriptions" + + " ON messages.groupId = contactSubscriptions.groupId" + + " JOIN statuses ON messages.messageId = statuses.messageId" + + " WHERE messages.messageId = ?" + + " AND contactSubscriptions.contactId = ?" + + " AND statuses.contactId = ? AND status = ?" + + " AND sendability > ZERO()"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setInt(2, c.getInt()); + ps.setInt(3, c.getInt()); + ps.setShort(4, (short) Status.NEW.ordinal()); + rs = ps.executeQuery(); + byte[] raw = null; + if(rs.next()) { + int size = rs.getInt(1); + Blob b = rs.getBlob(2); + raw = b.getBytes(1, size); + assert raw.length == size; + } + boolean more = rs.next(); + assert !more; + rs.close(); + ps.close(); + return raw; + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } + } + public Collection getMessagesByAuthor(Connection txn, AuthorId a) throws DbException { PreparedStatement ps = null; @@ -836,7 +875,7 @@ abstract class JdbcDatabase implements Database { } } - public Collection getOldMessages(Connection txn, long capacity) + public Collection getOldMessages(Connection txn, int capacity) throws DbException { PreparedStatement ps = null; ResultSet rs = null; @@ -846,7 +885,7 @@ abstract class JdbcDatabase implements Database { ps = txn.prepareStatement(sql); rs = ps.executeQuery(); Collection ids = new ArrayList(); - long total = 0L; + int total = 0; while(rs.next()) { int size = rs.getInt(1); if(total + size > capacity) break; @@ -1356,6 +1395,44 @@ abstract class JdbcDatabase implements Database { } } + public boolean setStatusSeenIfVisible(Connection txn, ContactId c, + MessageId m) throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT COUNT(messages.messageId) FROM messages" + + " JOIN contactSubscriptions" + + " ON messages.groupId = contactSubscriptions.groupId" + + " WHERE messageId = ? AND contactId = ?"; + ps = txn.prepareStatement(sql); + rs = ps.executeQuery(); + boolean found = rs.next(); + assert found; + int count = rs.getInt(1); + assert count <= 1; + boolean more = rs.next(); + assert !more; + rs.close(); + ps.close(); + if(count == 0) return false; + sql = "UPDATE statuses SET status = ?" + + " WHERE messageId = ? AND contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setShort(1, (short) Status.SEEN.ordinal()); + ps.setBytes(2, m.getBytes()); + ps.setInt(3, c.getInt()); + int rowsAffected = ps.executeUpdate(); + assert rowsAffected <= 1; + ps.close(); + return true; + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } + } + public void setSubscriptions(Connection txn, ContactId c, Collection subs, long timestamp) throws DbException { PreparedStatement ps = null; diff --git a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java index 41f38f6ef..2dafd0a1b 100644 --- a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java +++ b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java @@ -2,11 +2,10 @@ package net.sf.briar.db; import java.io.IOException; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -23,10 +22,13 @@ import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageId; +import net.sf.briar.api.protocol.Offer; import net.sf.briar.api.protocol.Subscriptions; import net.sf.briar.api.protocol.Transports; import net.sf.briar.api.protocol.writers.AckWriter; import net.sf.briar.api.protocol.writers.BatchWriter; +import net.sf.briar.api.protocol.writers.OfferWriter; +import net.sf.briar.api.protocol.writers.RequestWriter; import net.sf.briar.api.protocol.writers.SubscriptionWriter; import net.sf.briar.api.protocol.writers.TransportWriter; @@ -64,7 +66,7 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { super(db, cleaner); } - protected void expireMessages(long size) throws DbException { + protected void expireMessages(int size) throws DbException { contactLock.readLock().lock(); try { messageLock.writeLock().lock(); @@ -287,7 +289,7 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { - Set sent; + Collection sent; int bytesSent = 0; messageStatusLock.readLock().lock(); try { @@ -296,7 +298,7 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { int capacity = b.getCapacity(); Iterator it = db.getSendableMessages(txn, c, capacity).iterator(); - sent = new HashSet(); + sent = new ArrayList(); while(it.hasNext()) { MessageId m = it.next(); byte[] message = db.getMessage(txn, m); @@ -316,7 +318,7 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { messageStatusLock.readLock().unlock(); } BatchId id = b.finish(); - // Record the contents of the batch, unless it was empty + // Record the contents of the batch, unless it's empty if(sent.isEmpty()) return; messageStatusLock.writeLock().lock(); try { @@ -339,6 +341,104 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { } } + public Collection generateBatch(ContactId c, BatchWriter b, + Collection requested) throws DbException, IOException { + contactLock.readLock().lock(); + try { + if(!containsContact(c)) throw new NoSuchContactException(); + messageLock.readLock().lock(); + try { + Collection sent; + messageStatusLock.readLock().lock(); + try{ + Txn txn = db.startTransaction(); + try { + sent = new ArrayList(); + int bytesSent = 0; + for(MessageId m : requested) { + byte[] message = db.getMessageIfSendable(txn, c, m); + if(b == null) continue; // Expired or not sendable + if(!b.writeMessage(message)) break; + bytesSent += message.length; + sent.add(m); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.readLock().unlock(); + } + BatchId id = b.finish(); + // Record the contents of the batch, unless it's empty + if(sent.isEmpty()) return sent; + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + db.addOutstandingBatch(txn, c, id, sent); + db.commitTransaction(txn); + return sent; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + + public Collection generateOffer(ContactId c, OfferWriter o) + throws DbException, IOException { + contactLock.readLock().lock(); + try { + if(!containsContact(c)) throw new NoSuchContactException(); + messageLock.readLock().lock(); + try { + messageStatusLock.readLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + Collection sendable = + db.getSendableMessages(txn, c, Integer.MAX_VALUE); + Iterator it = sendable.iterator(); + Collection sent = new ArrayList(); + while(it.hasNext()) { + MessageId m = it.next(); + if(!o.writeMessageId(m)) break; + sent.add(m); + } + o.finish(); + db.commitTransaction(txn); + return sent; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.readLock().unlock(); + } + } finally { + messageLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + public void generateSubscriptions(ContactId c, SubscriptionWriter s) throws DbException, IOException { contactLock.readLock().lock(); @@ -565,6 +665,50 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { } } + public void receiveOffer(ContactId c, Offer o, RequestWriter r) + throws DbException, IOException { + contactLock.readLock().lock(); + try { + if(!containsContact(c)) throw new NoSuchContactException(); + messageLock.readLock().lock(); + try { + messageStatusLock.writeLock().lock(); + try { + subscriptionLock.readLock().lock(); + try { + BitSet request; + Txn txn = db.startTransaction(); + try { + Collection offered = o.getMessages(); + request = new BitSet(offered.size()); + Iterator it = offered.iterator(); + for(int i = 0; it.hasNext(); i++) { + // If the message is not in the database or if + // it is not visible to the contact, request it + MessageId m = it.next(); + if(!db.setStatusSeenIfVisible(txn, c, m)) + request.set(i); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + r.writeBitmap(request); + } finally { + subscriptionLock.readLock().unlock(); + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + public void receiveSubscriptions(ContactId c, Subscriptions s) throws DbException { // Update the contact's subscriptions diff --git a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java index dc9241d8a..a9e90c8ba 100644 --- a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java +++ b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java @@ -2,11 +2,10 @@ package net.sf.briar.db; import java.io.IOException; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -22,10 +21,13 @@ import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageId; +import net.sf.briar.api.protocol.Offer; import net.sf.briar.api.protocol.Subscriptions; import net.sf.briar.api.protocol.Transports; import net.sf.briar.api.protocol.writers.AckWriter; import net.sf.briar.api.protocol.writers.BatchWriter; +import net.sf.briar.api.protocol.writers.OfferWriter; +import net.sf.briar.api.protocol.writers.RequestWriter; import net.sf.briar.api.protocol.writers.SubscriptionWriter; import net.sf.briar.api.protocol.writers.TransportWriter; @@ -57,7 +59,7 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { super(db, cleaner); } - protected void expireMessages(long size) throws DbException { + protected void expireMessages(int size) throws DbException { synchronized(contactLock) { synchronized(messageLock) { synchronized(messageStatusLock) { @@ -220,7 +222,7 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { int capacity = b.getCapacity(); Iterator it = db.getSendableMessages(txn, c, capacity).iterator(); - Set sent = new HashSet(); + Collection sent = new ArrayList(); int bytesSent = 0; while(it.hasNext()) { MessageId m = it.next(); @@ -230,7 +232,7 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { sent.add(m); } BatchId id = b.finish(); - // Record the contents of the batch, unless it was empty + // Record the contents of the batch, unless it's empty if(!sent.isEmpty()) db.addOutstandingBatch(txn, c, id, sent); db.commitTransaction(txn); @@ -246,6 +248,73 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { } } + public Collection generateBatch(ContactId c, BatchWriter b, + Collection requested) throws DbException, IOException { + synchronized(contactLock) { + if(!containsContact(c)) throw new NoSuchContactException(); + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + Collection sent = new ArrayList(); + int bytesSent = 0; + for(MessageId m : requested) { + byte[] message = db.getMessageIfSendable(txn, c, m); + if(b == null) continue; // Expired or not sendable + if(!b.writeMessage(message)) break; + bytesSent += message.length; + sent.add(m); + } + BatchId id = b.finish(); + // Record the contents of the batch, unless it's empty + if(!sent.isEmpty()) + db.addOutstandingBatch(txn, c, id, sent); + db.commitTransaction(txn); + return sent; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } + + public Collection generateOffer(ContactId c, OfferWriter o) + throws DbException, IOException { + synchronized(contactLock) { + if(!containsContact(c)) throw new NoSuchContactException(); + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + Collection sendable = + db.getSendableMessages(txn, c, Integer.MAX_VALUE); + Iterator it = sendable.iterator(); + Collection sent = new ArrayList(); + while(it.hasNext()) { + MessageId m = it.next(); + if(!o.writeMessageId(m)) break; + sent.add(m); + } + o.finish(); + db.commitTransaction(txn); + return sent; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } + public void generateSubscriptions(ContactId c, SubscriptionWriter s) throws DbException, IOException { synchronized(contactLock) { @@ -421,6 +490,38 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { } } + public void receiveOffer(ContactId c, Offer o, RequestWriter r) + throws DbException, IOException { + synchronized(contactLock) { + if(!containsContact(c)) throw new NoSuchContactException(); + synchronized(messageLock) { + synchronized(messageStatusLock) { + synchronized(subscriptionLock) { + BitSet request; + Txn txn = db.startTransaction(); + try { + Collection offered = o.getMessages(); + request = new BitSet(offered.size()); + Iterator it = offered.iterator(); + for(int i = 0; it.hasNext(); i++) { + // If the message is not in the database or if + // it is not visible to the contact, request it + MessageId m = it.next(); + if(!db.setStatusSeenIfVisible(txn, c, m)) + request.set(i); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + r.writeBitmap(request); + } + } + } + } + } + public void receiveSubscriptions(ContactId c, Subscriptions s) throws DbException { // Update the contact's subscriptions