diff --git a/api/net/sf/briar/api/db/NeighbourId.java b/api/net/sf/briar/api/db/ContactId.java similarity index 65% rename from api/net/sf/briar/api/db/NeighbourId.java rename to api/net/sf/briar/api/db/ContactId.java index 943db44f1..3e793c4bf 100644 --- a/api/net/sf/briar/api/db/NeighbourId.java +++ b/api/net/sf/briar/api/db/ContactId.java @@ -1,11 +1,11 @@ package net.sf.briar.api.db; -/** Uniquely identifies a neighbour. */ -public class NeighbourId { +/** Uniquely identifies a contact. */ +public class ContactId { private final int id; - public NeighbourId(int id) { + public ContactId(int id) { this.id = id; } @@ -15,7 +15,7 @@ public class NeighbourId { @Override public boolean equals(Object o) { - if(o instanceof NeighbourId) return id == ((NeighbourId) o).id; + if(o instanceof ContactId) return id == ((ContactId) o).id; return false; } diff --git a/api/net/sf/briar/api/db/DatabaseComponent.java b/api/net/sf/briar/api/db/DatabaseComponent.java index b71012781..59261e2e1 100644 --- a/api/net/sf/briar/api/db/DatabaseComponent.java +++ b/api/net/sf/briar/api/db/DatabaseComponent.java @@ -29,14 +29,14 @@ public interface DatabaseComponent { /** Waits for any open transactions to finish and closes the database. */ void close() throws DbException; + /** Adds a new contact to the database. */ + void addContact(ContactId c) throws DbException; + /** Adds a locally generated message to the database. */ void addLocallyGeneratedMessage(Message m) throws DbException; - /** Adds a new neighbour to the database. */ - void addNeighbour(NeighbourId n) throws DbException; - - /** Generates a bundle of messages for the given neighbour. */ - void generateBundle(NeighbourId n, Bundle b) throws DbException; + /** Generates a bundle of messages for the given contact. */ + void generateBundle(ContactId c, Bundle b) throws DbException; /** Returns the user's rating for the given author. */ Rating getRating(AuthorId a) throws DbException; @@ -45,13 +45,13 @@ public interface DatabaseComponent { Set getSubscriptions() throws DbException; /** - * Processes a bundle of messages received from the given neighbour. Some + * Processes a bundle of messages received from the given contact. Some * or all of the messages in the bundle may be stored. */ - void receiveBundle(NeighbourId n, Bundle b) throws DbException; + void receiveBundle(ContactId c, Bundle b) throws DbException; - /** Removes a neighbour (and all associated state) from the database. */ - void removeNeighbour(NeighbourId n) throws DbException; + /** Removes a contact (and all associated state) from the database. */ + void removeContact(ContactId c) throws DbException; /** Records the user's rating for the given author. */ void setRating(AuthorId a, Rating r) throws DbException; diff --git a/api/net/sf/briar/api/db/Status.java b/api/net/sf/briar/api/db/Status.java index 0f80ba176..92a7070e6 100644 --- a/api/net/sf/briar/api/db/Status.java +++ b/api/net/sf/briar/api/db/Status.java @@ -1,17 +1,11 @@ package net.sf.briar.api.db; -/** The status of a message with respect to a neighbour. */ +/** The status of a message with respect to a particular contact. */ public enum Status { - /** - * The message has not been sent to, received from, or acked by the - * neighbour. - */ + /** The message has not been sent, received, or acked. */ NEW, - /** - * The message has been sent to, but not received from or acked by, the - * neighbour. - */ + /** The message has been sent, but not received or acked. */ SENT, - /** The message has been received from or acked by the neighbour. */ + /** The message has been received or acked. */ SEEN } diff --git a/components/net/sf/briar/db/Database.java b/components/net/sf/briar/db/Database.java index b00fa2ee0..7c3021d91 100644 --- a/components/net/sf/briar/db/Database.java +++ b/components/net/sf/briar/db/Database.java @@ -3,7 +3,7 @@ package net.sf.briar.db; import java.util.Set; import net.sf.briar.api.db.DbException; -import net.sf.briar.api.db.NeighbourId; +import net.sf.briar.api.db.ContactId; import net.sf.briar.api.db.Rating; import net.sf.briar.api.db.Status; import net.sf.briar.api.protocol.AuthorId; @@ -62,7 +62,7 @@ interface Database { *

* Locking: contacts read, messageStatuses write. */ - void addBatchToAck(T txn, NeighbourId n, BatchId b) throws DbException; + void addBatchToAck(T txn, ContactId c, BatchId b) throws DbException; /** * Returns false if the given message is already in the database. Otherwise @@ -73,18 +73,18 @@ interface Database { boolean addMessage(T txn, Message m) throws DbException; /** - * Adds a new neighbour to the database. + * Adds a new contact to the database. *

* Locking: contacts write, messageStatuses write. */ - void addNeighbour(T txn, NeighbourId n) throws DbException; + void addContact(T txn, ContactId c) throws DbException; /** * Records a sent batch as needing to be acknowledged. *

* Locking: contacts read, messages read, messageStatuses write. */ - void addOutstandingBatch(T txn, NeighbourId n, BatchId b, Set sent) throws DbException; + void addOutstandingBatch(T txn, ContactId c, BatchId b, Set sent) throws DbException; /** * Records a received bundle. This should be called after processing the @@ -93,7 +93,7 @@ interface Database { *

* Locking: contacts read, messages read, messageStatuses write. */ - Set addReceivedBundle(T txn, NeighbourId n, BundleId b) throws DbException; + Set addReceivedBundle(T txn, ContactId c, BundleId b) throws DbException; /** * Subscribes to the given group. @@ -103,18 +103,25 @@ interface Database { void addSubscription(T txn, GroupId g) throws DbException; /** - * Records a neighbour's subscription to a group. + * Records a contact's subscription to a group. *

* Locking: contacts read, messageStatuses write. */ - void addSubscription(T txn, NeighbourId n, GroupId g) throws DbException; + void addSubscription(T txn, ContactId c, GroupId g) throws DbException; /** - * Removes all recorded subscriptions for the given neighbour. + * Removes all recorded subscriptions for the given contact. *

* Locking: contacts read, messageStatuses write. */ - void clearSubscriptions(T txn, NeighbourId n) throws DbException; + void clearSubscriptions(T txn, ContactId c) throws DbException; + + /** + * Returns true iff the database contains the given contact. + *

+ * Locking: contacts read. + */ + boolean containsContact(T txn, ContactId c) throws DbException; /** * Returns true iff the database contains the given message. @@ -123,13 +130,6 @@ interface Database { */ boolean containsMessage(T txn, MessageId m) throws DbException; - /** - * Returns true iff the database contains the given neighbour. - *

- * Locking: contacts read. - */ - boolean containsNeighbour(T txn, NeighbourId n) throws DbException; - /** * Returns true iff the user is subscribed to the given group. *

@@ -137,6 +137,13 @@ interface Database { */ boolean containsSubscription(T txn, GroupId g) throws DbException; + /** + * Returns the IDs of all contacts. + *

+ * Locking: contacts read, messageStatuses read. + */ + Set getContacts(T txn) throws DbException; + /** * Returns the amount of free storage space available to the database, in * bytes. This is based on the minimum of the space available on the device @@ -168,13 +175,6 @@ interface Database { */ Iterable getMessagesByParent(T txn, MessageId m) throws DbException; - /** - * Returns the IDs of all neighbours. - *

- * Locking: contacts read, messageStatuses read. - */ - Set getNeighbours(T txn) throws DbException; - /** * Returns the IDs of the oldest messages in the database, with a total * size less than or equal to the given size. @@ -200,7 +200,7 @@ interface Database { /** * Returns the sendability score of the given message. Messages with * sendability scores greater than zero are eligible to be sent to - * neighbours. + * contacts. *

* Locking: messages read. */ @@ -208,11 +208,11 @@ interface Database { /** * Returns the IDs of some messages that are eligible to be sent to the - * given neighbour, with a total size less than or equal to the given size. + * given contact, with a total size less than or equal to the given size. *

* Locking: contacts read, messages read, messageStatuses read. */ - Iterable getSendableMessages(T txn, NeighbourId n, long capacity) throws DbException; + Iterable getSendableMessages(T txn, ContactId c, long capacity) throws DbException; /** * Returns the groups to which the user subscribes. @@ -224,28 +224,35 @@ interface Database { /** * Removes an outstanding batch that has been acknowledged. Any messages in * the batch that are still considered outstanding (Status.SENT) with - * respect to the given neighbour are now considered seen (Status.SEEN). + * respect to the given contact are now considered seen (Status.SEEN). *

* Locking: contacts read, messages read, messageStatuses write. */ - void removeAckedBatch(T txn, NeighbourId n, BatchId b) throws DbException; + void removeAckedBatch(T txn, ContactId c, BatchId b) throws DbException; /** * Removes and returns the IDs of any batches received from the given - * neighbour that need to be acknowledged. + * contact that need to be acknowledged. *

* Locking: contacts read, messageStatuses write. */ - Set removeBatchesToAck(T txn, NeighbourId n) throws DbException; + Set removeBatchesToAck(T txn, ContactId c) throws DbException; + + /** + * Removes a contact (and all associated state) from the database. + *

+ * Locking: contacts write, messageStatuses write. + */ + void removeContact(T txn, ContactId c) throws DbException; /** * Removes an outstanding batch that has been lost. Any messages in the * batch that are still considered outstanding (Status.SENT) with respect - * to the given neighbour are now considered unsent (Status.NEW). + * to the given contact are now considered unsent (Status.NEW). *

* Locking: contacts read, messages read, messageStatuses write. */ - void removeLostBatch(T txn, NeighbourId n, BatchId b) throws DbException; + void removeLostBatch(T txn, ContactId c, BatchId b) throws DbException; /** * Removes a message (and all associated state) from the database. @@ -254,13 +261,6 @@ interface Database { */ void removeMessage(T txn, MessageId m) throws DbException; - /** - * Removes a neighbour (and all associated state) from the database. - *

- * Locking: contacts write, messageStatuses write. - */ - void removeNeighbour(T txn, NeighbourId n) throws DbException; - /** * Unsubscribes from the given group. Any messages belonging to the group * are deleted from the database. @@ -285,9 +285,9 @@ interface Database { void setSendability(T txn, MessageId m, int sendability) throws DbException; /** - * Sets the status of the given message with respect to the given neighbour. + * Sets the status of the given message with respect to the given contact. *

* Locking: contacts read, messages read, messageStatuses write. */ - void setStatus(T txn, NeighbourId n, MessageId m, Status s) throws DbException; + void setStatus(T txn, ContactId c, MessageId m, Status s) throws DbException; } diff --git a/components/net/sf/briar/db/DatabaseComponentImpl.java b/components/net/sf/briar/db/DatabaseComponentImpl.java index 638eecd38..ed6ba2691 100644 --- a/components/net/sf/briar/db/DatabaseComponentImpl.java +++ b/components/net/sf/briar/db/DatabaseComponentImpl.java @@ -5,7 +5,7 @@ import java.util.logging.Logger; import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DbException; -import net.sf.briar.api.db.NeighbourId; +import net.sf.briar.api.db.ContactId; import net.sf.briar.api.db.Rating; import net.sf.briar.api.db.Status; import net.sf.briar.api.protocol.AuthorId; @@ -75,10 +75,10 @@ abstract class DatabaseComponentImpl implements DatabaseComponent { } // Locking: contacts read - protected boolean containsNeighbour(NeighbourId n) throws DbException { + protected boolean containsContact(ContactId c) throws DbException { Txn txn = db.startTransaction(); try { - boolean contains = db.containsNeighbour(txn, n); + boolean contains = db.containsContact(txn, c); db.commitTransaction(txn); return contains; } catch(DbException e) { @@ -141,16 +141,16 @@ abstract class DatabaseComponentImpl implements DatabaseComponent { } // Locking: contacts read, messages write, messageStatuses write - protected boolean storeMessage(Txn txn, Message m, NeighbourId sender) + protected boolean storeMessage(Txn txn, Message m, ContactId sender) throws DbException { boolean added = db.addMessage(txn, m); // Mark the message as seen by the sender MessageId id = m.getId(); if(sender != null) db.setStatus(txn, sender, id, Status.SEEN); if(added) { - // Mark the message as unseen by other neighbours - for(NeighbourId n : db.getNeighbours(txn)) { - if(!n.equals(sender)) db.setStatus(txn, n, id, Status.NEW); + // Mark the message as unseen by other contacts + for(ContactId c : db.getContacts(txn)) { + if(!c.equals(sender)) db.setStatus(txn, c, id, Status.NEW); } // Calculate and store the message's sendability int sendability = calculateSendability(txn, m); diff --git a/components/net/sf/briar/db/JdbcDatabase.java b/components/net/sf/briar/db/JdbcDatabase.java index 173d0ac7b..2f8611751 100644 --- a/components/net/sf/briar/db/JdbcDatabase.java +++ b/components/net/sf/briar/db/JdbcDatabase.java @@ -19,7 +19,7 @@ import java.util.logging.Logger; import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DbException; -import net.sf.briar.api.db.NeighbourId; +import net.sf.briar.api.db.ContactId; import net.sf.briar.api.db.Rating; import net.sf.briar.api.db.Status; import net.sf.briar.api.protocol.AuthorId; @@ -64,46 +64,46 @@ abstract class JdbcDatabase implements Database { private static final String INDEX_MESSAGES_BY_SENDABILITY = "CREATE INDEX messagesBySendability ON messages (sendability)"; - private static final String CREATE_NEIGHBOURS = - "CREATE TABLE neighbours" - + " (neighbourId INT NOT NULL," + private static final String CREATE_CONTACTS = + "CREATE TABLE contacts" + + " (contactId INT NOT NULL," + " lastBundleReceived XXXX NOT NULL," - + " PRIMARY KEY (neighbourId))"; + + " PRIMARY KEY (contactId))"; private static final String CREATE_BATCHES_TO_ACK = "CREATE TABLE batchesToAck" + " (batchId XXXX NOT NULL," - + " neighbourId INT NOT NULL," + + " contactId INT NOT NULL," + " PRIMARY KEY (batchId)," - + " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)" + + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" + " ON DELETE CASCADE)"; - private static final String CREATE_NEIGHBOUR_SUBSCRIPTIONS = - "CREATE TABLE neighbourSubscriptions" - + " (neighbourId INT NOT NULL," + private static final String CREATE_CONTACT_SUBSCRIPTIONS = + "CREATE TABLE contactSubscriptions" + + " (contactId INT NOT NULL," + " groupId XXXX NOT NULL," - + " PRIMARY KEY (neighbourId, groupId)," - + " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)" + + " PRIMARY KEY (contactId, groupId)," + + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" + " ON DELETE CASCADE)"; private static final String CREATE_OUTSTANDING_BATCHES = "CREATE TABLE outstandingBatches" + " (batchId XXXX NOT NULL," - + " neighbourId INT NOT NULL," + + " contactId INT NOT NULL," + " lastBundleReceived XXXX NOT NULL," + " PRIMARY KEY (batchId)," - + " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)" + + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" + " ON DELETE CASCADE)"; private static final String CREATE_OUTSTANDING_MESSAGES = "CREATE TABLE outstandingMessages" + " (batchId XXXX NOT NULL," - + " neighbourId INT NOT NULL," + + " contactId INT NOT NULL," + " messageId XXXX NOT NULL," + " PRIMARY KEY (batchId, messageId)," + " FOREIGN KEY (batchId) REFERENCES outstandingBatches (batchId)" + " ON DELETE CASCADE," - + " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)" + + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" + " ON DELETE CASCADE," + " FOREIGN KEY (messageId) REFERENCES messages (messageId)" + " ON DELETE CASCADE)"; @@ -121,28 +121,28 @@ abstract class JdbcDatabase implements Database { private static final String CREATE_RECEIVED_BUNDLES = "CREATE TABLE receivedBundles" + " (bundleId XXXX NOT NULL," - + " neighbourId INT NOT NULL," + + " contactId INT NOT NULL," + " timestamp BIGINT NOT NULL," + " PRIMARY KEY (bundleId)," - + " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)" + + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" + " ON DELETE CASCADE)"; private static final String CREATE_STATUSES = "CREATE TABLE statuses" + " (messageId XXXX NOT NULL," - + " neighbourId INT NOT NULL," + + " contactId INT NOT NULL," + " status SMALLINT NOT NULL," - + " PRIMARY KEY (messageId, neighbourId)," + + " PRIMARY KEY (messageId, contactId)," + " FOREIGN KEY (messageId) REFERENCES messages (messageId)" + " ON DELETE CASCADE," - + " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)" + + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" + " ON DELETE CASCADE)"; private static final String INDEX_STATUSES_BY_MESSAGE = "CREATE INDEX statusesByMessage ON statuses (messageId)"; - private static final String INDEX_STATUSES_BY_NEIGHBOUR = - "CREATE INDEX statusesByNeighbour ON statuses (neighbourId)"; + private static final String INDEX_STATUSES_BY_CONTACT = + "CREATE INDEX statusesByContact ON statuses (contactId)"; private static final Logger LOG = Logger.getLogger(JdbcDatabase.class.getName()); @@ -207,14 +207,14 @@ abstract class JdbcDatabase implements Database { s.executeUpdate(INDEX_MESSAGES_BY_TIMESTAMP); s.executeUpdate(INDEX_MESSAGES_BY_SENDABILITY); if(LOG.isLoggable(Level.FINE)) - LOG.fine("Creating neighbours table"); - s.executeUpdate(insertHashType(CREATE_NEIGHBOURS)); + LOG.fine("Creating contacts table"); + s.executeUpdate(insertHashType(CREATE_CONTACTS)); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating batchesToAck table"); s.executeUpdate(insertHashType(CREATE_BATCHES_TO_ACK)); if(LOG.isLoggable(Level.FINE)) - LOG.fine("Creating neighbourSubscriptions table"); - s.executeUpdate(insertHashType(CREATE_NEIGHBOUR_SUBSCRIPTIONS)); + LOG.fine("Creating contactSubscriptions table"); + s.executeUpdate(insertHashType(CREATE_CONTACT_SUBSCRIPTIONS)); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating outstandingBatches table"); s.executeUpdate(insertHashType(CREATE_OUTSTANDING_BATCHES)); @@ -232,7 +232,7 @@ abstract class JdbcDatabase implements Database { LOG.fine("Creating statuses table"); s.executeUpdate(insertHashType(CREATE_STATUSES)); s.executeUpdate(INDEX_STATUSES_BY_MESSAGE); - s.executeUpdate(INDEX_STATUSES_BY_NEIGHBOUR); + s.executeUpdate(INDEX_STATUSES_BY_CONTACT); s.close(); } catch(SQLException e) { tryToClose(s); @@ -340,16 +340,35 @@ abstract class JdbcDatabase implements Database { } } - public void addBatchToAck(Connection txn, NeighbourId n, BatchId b) + public void addBatchToAck(Connection txn, ContactId c, BatchId b) throws DbException { PreparedStatement ps = null; try { String sql = "INSERT INTO batchesToAck" - + " (batchId, neighbourId)" + + " (batchId, contactId)" + " VALUES (?, ?)"; ps = txn.prepareStatement(sql); ps.setBytes(1, b.getBytes()); - ps.setInt(2, n.getInt()); + ps.setInt(2, c.getInt()); + int rowsAffected = ps.executeUpdate(); + assert rowsAffected == 1; + ps.close(); + } catch(SQLException e) { + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } + } + + public void addContact(Connection txn, ContactId c) throws DbException { + PreparedStatement ps = null; + try { + String sql = "INSERT INTO contacts" + + " (contactId, lastBundleReceived)" + + " VALUES (?, ?)"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setBytes(2, BundleId.NONE.getBytes()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; ps.close(); @@ -388,35 +407,16 @@ abstract class JdbcDatabase implements Database { } } - public void addNeighbour(Connection txn, NeighbourId n) throws DbException { - PreparedStatement ps = null; - try { - String sql = "INSERT INTO neighbours" - + " (neighbourId, lastBundleReceived)" - + " VALUES (?, ?)"; - ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); - ps.setBytes(2, BundleId.NONE.getBytes()); - int rowsAffected = ps.executeUpdate(); - assert rowsAffected == 1; - ps.close(); - } catch(SQLException e) { - tryToClose(ps); - tryToClose(txn); - throw new DbException(e); - } - } - - public void addOutstandingBatch(Connection txn, NeighbourId n, BatchId b, + public void addOutstandingBatch(Connection txn, ContactId c, BatchId b, Set sent) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - // Find the ID of the last bundle received from n - String sql = "SELECT lastBundleReceived FROM neighbours" - + " WHERE neighbourId = ?"; + // Find the ID of the last bundle received from c + String sql = "SELECT lastBundleReceived FROM contacts" + + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setInt(1, c.getInt()); rs = ps.executeQuery(); boolean found = rs.next(); assert found; @@ -427,22 +427,22 @@ abstract class JdbcDatabase implements Database { ps.close(); // Create an outstanding batch row sql = "INSERT INTO outstandingBatches" - + " (batchId, neighbourId, lastBundleReceived)" + + " (batchId, contactId, lastBundleReceived)" + " VALUES (?, ?, ?)"; ps = txn.prepareStatement(sql); ps.setBytes(1, b.getBytes()); - ps.setInt(2, n.getInt()); + ps.setInt(2, c.getInt()); ps.setBytes(3, lastBundleReceived); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; ps.close(); // Create an outstanding message row for each message in the batch sql = "INSERT INTO outstandingMessages" - + " (batchId, neighbourId, messageId)" + + " (batchId, contactId, messageId)" + " VALUES (?, ?, ?)"; ps = txn.prepareStatement(sql); ps.setBytes(1, b.getBytes()); - ps.setInt(2, n.getInt()); + ps.setInt(2, c.getInt()); for(MessageId m : sent) { ps.setBytes(3, m.getBytes()); ps.addBatch(); @@ -455,10 +455,10 @@ abstract class JdbcDatabase implements Database { ps.close(); // Set the status of each message in the batch to SENT sql = "UPDATE statuses SET status = ?" - + " WHERE messageId = ? AND neighbourId = ? AND status = ?"; + + " WHERE messageId = ? AND contactId = ? AND status = ?"; ps = txn.prepareStatement(sql); ps.setShort(1, (short) Status.SENT.ordinal()); - ps.setInt(3, n.getInt()); + ps.setInt(3, c.getInt()); ps.setShort(4, (short) Status.NEW.ordinal()); for(MessageId m : sent) { ps.setBytes(2, m.getBytes()); @@ -478,25 +478,25 @@ abstract class JdbcDatabase implements Database { } } - public Set addReceivedBundle(Connection txn, NeighbourId n, + public Set addReceivedBundle(Connection txn, ContactId c, BundleId b) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - // Update the ID of the last bundle received from n - String sql = "UPDATE neighbours SET lastBundleReceived = ?" - + " WHERE neighbourId = ?"; + // Update the ID of the last bundle received from c + String sql = "UPDATE contacts SET lastBundleReceived = ?" + + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); ps.setBytes(1, b.getBytes()); - ps.setInt(2, n.getInt()); + ps.setInt(2, c.getInt()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; ps.close(); - // Count the received bundle records for n and find the oldest + // Count the received bundle records for c and find the oldest sql = "SELECT bundleId, timestamp FROM receivedBundles" - + " WHERE neighbourId = ?"; + + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setInt(1, c.getInt()); rs = ps.executeQuery(); int received = 0; long oldestTimestamp = Long.MAX_VALUE; @@ -516,7 +516,7 @@ abstract class JdbcDatabase implements Database { if(received == DatabaseComponent.RETRANSMIT_THRESHOLD) { // Expire batches related to the oldest received bundle assert oldestBundle != null; - lost = findLostBatches(txn, n, oldestBundle); + lost = findLostBatches(txn, c, oldestBundle); sql = "DELETE FROM receivedBundles WHERE bundleId = ?"; ps = txn.prepareStatement(sql); ps.setBytes(1, oldestBundle); @@ -528,11 +528,11 @@ abstract class JdbcDatabase implements Database { } // Record the new received bundle sql = "INSERT INTO receivedBundles" - + " (bundleId, neighbourId, timestamp)" + + " (bundleId, contactId, timestamp)" + " VALUES (?, ?, ?)"; ps = txn.prepareStatement(sql); ps.setBytes(1, b.getBytes()); - ps.setInt(2, n.getInt()); + ps.setInt(2, c.getInt()); ps.setLong(3, System.currentTimeMillis()); rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; @@ -546,15 +546,15 @@ abstract class JdbcDatabase implements Database { } } - private Set findLostBatches(Connection txn, NeighbourId n, + private Set findLostBatches(Connection txn, ContactId c, byte[] lastBundleReceived) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { String sql = "SELECT batchId FROM outstandingBatches" - + " WHERE neighbourId = ? AND lastBundleReceived = ?"; + + " WHERE contactId = ? AND lastBundleReceived = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setInt(1, c.getInt()); ps.setBytes(2, lastBundleReceived); rs = ps.executeQuery(); Set lost = new HashSet(); @@ -586,15 +586,15 @@ abstract class JdbcDatabase implements Database { } } - public void addSubscription(Connection txn, NeighbourId n, GroupId g) + public void addSubscription(Connection txn, ContactId c, GroupId g) throws DbException { PreparedStatement ps = null; try { - String sql = "INSERT INTO neighbourSubscriptions" - + " (neighbourId, groupId)" + String sql = "INSERT INTO contactSubscriptions" + + " (contactId, groupId)" + " VALUES (?, ?)"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setInt(1, c.getInt()); ps.setBytes(2, g.getBytes()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; @@ -606,14 +606,14 @@ abstract class JdbcDatabase implements Database { } } - public void clearSubscriptions(Connection txn, NeighbourId n) + public void clearSubscriptions(Connection txn, ContactId c) throws DbException { PreparedStatement ps = null; try { - String sql = "DELETE FROM neighbourSubscriptions" - + " WHERE neighbourId = ?"; + String sql = "DELETE FROM contactSubscriptions" + + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setInt(1, c.getInt()); ps.executeUpdate(); ps.close(); } catch(SQLException e) { @@ -623,15 +623,15 @@ abstract class JdbcDatabase implements Database { } } - public boolean containsMessage(Connection txn, MessageId m) + public boolean containsContact(Connection txn, ContactId c) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT COUNT(messageId) FROM messages" - + " WHERE messageId = ?"; + String sql = "SELECT COUNT(contactId) FROM contacts" + + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); - ps.setBytes(1, m.getBytes()); + ps.setInt(1, c.getInt()); rs = ps.executeQuery(); boolean found = rs.next(); assert found; @@ -650,15 +650,15 @@ abstract class JdbcDatabase implements Database { } } - public boolean containsNeighbour(Connection txn, NeighbourId n) + public boolean containsMessage(Connection txn, MessageId m) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT COUNT(neighbourId) FROM neighbours" - + " WHERE neighbourId = ?"; + String sql = "SELECT COUNT(messageId) FROM messages" + + " WHERE messageId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setBytes(1, m.getBytes()); rs = ps.executeQuery(); boolean found = rs.next(); assert found; @@ -704,6 +704,26 @@ abstract class JdbcDatabase implements Database { } } + public Set getContacts(Connection txn) throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT contactId FROM contacts"; + ps = txn.prepareStatement(sql); + rs = ps.executeQuery(); + Set ids = new HashSet(); + while(rs.next()) ids.add(new ContactId(rs.getInt(1))); + rs.close(); + ps.close(); + return ids; + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } + } + protected long getDiskSpace(File f) { long total = 0L; if(f.isDirectory()) { @@ -790,26 +810,6 @@ abstract class JdbcDatabase implements Database { } } - public Set getNeighbours(Connection txn) throws DbException { - PreparedStatement ps = null; - ResultSet rs = null; - try { - String sql = "SELECT neighbourId FROM neighbours"; - ps = txn.prepareStatement(sql); - rs = ps.executeQuery(); - Set ids = new HashSet(); - while(rs.next()) ids.add(new NeighbourId(rs.getInt(1))); - rs.close(); - ps.close(); - return ids; - } catch(SQLException e) { - tryToClose(rs); - tryToClose(ps); - tryToClose(txn); - throw new DbException(e); - } - } - public int getNumberOfMessages(Connection txn) throws DbException { PreparedStatement ps = null; ResultSet rs = null; @@ -936,19 +936,19 @@ abstract class JdbcDatabase implements Database { } public Iterable getSendableMessages(Connection txn, - NeighbourId n, long capacity) throws DbException { + ContactId c, long capacity) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { String sql = "SELECT size, messages.messageId FROM messages" - + " JOIN neighbourSubscriptions" - + " ON messages.groupId = neighbourSubscriptions.groupId" + + " JOIN contactSubscriptions" + + " ON messages.groupId = contactSubscriptions.groupId" + " JOIN statuses ON messages.messageId = statuses.messageId" - + " WHERE neighbourSubscriptions.neighbourId = ?" - + " AND statuses.neighbourId = ? AND status = ?"; + + " WHERE contactSubscriptions.contactId = ?" + + " AND statuses.contactId = ? AND status = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); - ps.setInt(2, n.getInt()); + ps.setInt(1, c.getInt()); + ps.setInt(2, c.getInt()); ps.setShort(3, (short) Status.NEW.ordinal()); rs = ps.executeQuery(); List ids = new ArrayList(); @@ -995,27 +995,27 @@ abstract class JdbcDatabase implements Database { } } - public void removeAckedBatch(Connection txn, NeighbourId n, BatchId b) + public void removeAckedBatch(Connection txn, ContactId c, BatchId b) throws DbException { - removeBatch(txn, n, b, Status.SEEN); + removeBatch(txn, c, b, Status.SEEN); } - private void removeBatch(Connection txn, NeighbourId n, BatchId b, + private void removeBatch(Connection txn, ContactId c, BatchId b, Status newStatus) throws DbException { PreparedStatement ps = null, ps1 = null; ResultSet rs = null; try { String sql = "SELECT messageId FROM outstandingMessages" - + " WHERE neighbourId = ? AND batchId = ?"; + + " WHERE contactId = ? AND batchId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setInt(1, c.getInt()); ps.setBytes(2, b.getBytes()); rs = ps.executeQuery(); sql = "UPDATE statuses SET status = ?" - + " WHERE messageId = ? AND neighbourId = ? AND status = ?"; + + " WHERE messageId = ? AND contactId = ? AND status = ?"; ps1 = txn.prepareStatement(sql); ps1.setShort(1, (short) newStatus.ordinal()); - ps1.setInt(3, n.getInt()); + ps1.setInt(3, c.getInt()); ps1.setShort(4, (short) Status.SENT.ordinal()); int messages = 0; while(rs.next()) { @@ -1047,23 +1047,23 @@ abstract class JdbcDatabase implements Database { } } - public Set removeBatchesToAck(Connection txn, NeighbourId n) + public Set removeBatchesToAck(Connection txn, ContactId c) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { String sql = "SELECT batchId FROM batchesToAck" - + " WHERE neighbourId = ?"; + + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setInt(1, c.getInt()); rs = ps.executeQuery(); Set ids = new HashSet(); while(rs.next()) ids.add(new BatchId(rs.getBytes(1))); rs.close(); ps.close(); - sql = "DELETE FROM batchesToAck WHERE neighbourId = ?"; + sql = "DELETE FROM batchesToAck WHERE contactId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setInt(1, c.getInt()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == ids.size(); ps.close(); @@ -1076,17 +1076,13 @@ abstract class JdbcDatabase implements Database { } } - public void removeLostBatch(Connection txn, NeighbourId n, BatchId b) + public void removeContact(Connection txn, ContactId c) throws DbException { - removeBatch(txn, n, b, Status.NEW); - } - - public void removeMessage(Connection txn, MessageId m) throws DbException { PreparedStatement ps = null; try { - String sql = "DELETE FROM messages WHERE messageId = ?"; + String sql = "DELETE FROM contacts WHERE contactId = ?"; ps = txn.prepareStatement(sql); - ps.setBytes(1, m.getBytes()); + ps.setInt(1, c.getInt()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; ps.close(); @@ -1097,13 +1093,17 @@ abstract class JdbcDatabase implements Database { } } - public void removeNeighbour(Connection txn, NeighbourId n) + public void removeLostBatch(Connection txn, ContactId c, BatchId b) throws DbException { + removeBatch(txn, c, b, Status.NEW); + } + + public void removeMessage(Connection txn, MessageId m) throws DbException { PreparedStatement ps = null; try { - String sql = "DELETE FROM neighbours WHERE neighbourId = ?"; + String sql = "DELETE FROM messages WHERE messageId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); + ps.setBytes(1, m.getBytes()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; ps.close(); @@ -1194,16 +1194,16 @@ abstract class JdbcDatabase implements Database { } } - public void setStatus(Connection txn, NeighbourId n, MessageId m, Status s) + public void setStatus(Connection txn, ContactId c, MessageId m, Status s) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { String sql = "SELECT status FROM statuses" - + " WHERE messageId = ? AND neighbourId = ?"; + + " WHERE messageId = ? AND contactId = ?"; ps = txn.prepareStatement(sql); ps.setBytes(1, m.getBytes()); - ps.setInt(2, n.getInt()); + ps.setInt(2, c.getInt()); rs = ps.executeQuery(); if(rs.next()) { Status old = Status.values()[rs.getByte(1)]; @@ -1213,11 +1213,11 @@ abstract class JdbcDatabase implements Database { ps.close(); if(!old.equals(Status.SEEN) && !old.equals(s)) { sql = "UPDATE statuses SET status = ?" - + " WHERE messageId = ? AND neighbourId = ?"; + + " WHERE messageId = ? AND contactId = ?"; ps = txn.prepareStatement(sql); ps.setShort(1, (short) s.ordinal()); ps.setBytes(2, m.getBytes()); - ps.setInt(3, n.getInt()); + ps.setInt(3, c.getInt()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; ps.close(); @@ -1225,11 +1225,11 @@ abstract class JdbcDatabase implements Database { } else { rs.close(); ps.close(); - sql = "INSERT INTO statuses (messageId, neighbourId, status)" + sql = "INSERT INTO statuses (messageId, contactId, status)" + " VALUES (?, ?, ?)"; ps = txn.prepareStatement(sql); ps.setBytes(1, m.getBytes()); - ps.setInt(2, n.getInt()); + ps.setInt(2, c.getInt()); ps.setShort(3, (short) s.ordinal()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; diff --git a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java index dc75fe9e0..f6d874fbe 100644 --- a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java +++ b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java @@ -8,7 +8,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import net.sf.briar.api.db.DbException; -import net.sf.briar.api.db.NeighbourId; +import net.sf.briar.api.db.ContactId; import net.sf.briar.api.db.Rating; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; @@ -49,34 +49,6 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { super(db, batchProvider); } - protected void expireMessages(long size) throws DbException { - contactLock.readLock().lock(); - try { - messageLock.writeLock().lock(); - try { - messageStatusLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - for(MessageId m : db.getOldMessages(txn, size)) { - removeMessage(txn, m); - } - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.writeLock().unlock(); - } - } finally { - messageLock.writeLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - } - public void close() throws DbException { contactLock.writeLock().lock(); try { @@ -106,15 +78,15 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { } } - public void addNeighbour(NeighbourId n) throws DbException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n); + public void addContact(ContactId c) throws DbException { + if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding contact " + c); contactLock.writeLock().lock(); try { messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { - db.addNeighbour(txn, n); + db.addContact(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -166,6 +138,164 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { } } + protected void expireMessages(long size) throws DbException { + contactLock.readLock().lock(); + try { + messageLock.writeLock().lock(); + try { + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + for(MessageId m : db.getOldMessages(txn, size)) { + removeMessage(txn, m); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + + public void generateBundle(ContactId c, Bundle b) throws DbException { + if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c); + // Ack all batches received from c + contactLock.readLock().lock(); + try { + if(!containsContact(c)) return; + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + int numAcks = 0; + for(BatchId ack : db.removeBatchesToAck(txn, c)) { + b.addAck(ack); + numAcks++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + numAcks + " acks"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + // Add a list of subscriptions + contactLock.readLock().lock(); + try { + if(!containsContact(c)) return; + subscriptionLock.readLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + int numSubs = 0; + for(GroupId g : db.getSubscriptions(txn)) { + b.addSubscription(g); + numSubs++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + numSubs + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + subscriptionLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + // Add as many messages as possible to the bundle + long capacity = b.getCapacity(); + while(true) { + Batch batch = fillBatch(c, capacity); + if(batch == null) break; // No more messages to send + b.addBatch(batch); + capacity -= batch.getSize(); + // If the batch is less than half full, stop trying - there may be + // more messages trickling in but we can't wait forever + if(batch.getSize() * 2 < Batch.CAPACITY) break; + } + b.seal(); + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Bundle sent, " + b.getSize() + " bytes"); + System.gc(); + } + + private Batch fillBatch(ContactId c, long capacity) throws DbException { + contactLock.readLock().lock(); + try { + if(!containsContact(c)) return null; + messageLock.readLock().lock(); + try { + Set sent; + Batch b; + messageStatusLock.readLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + capacity = Math.min(capacity, Batch.CAPACITY); + Iterator it = + db.getSendableMessages(txn, c, capacity).iterator(); + if(!it.hasNext()) { + db.commitTransaction(txn); + return null; // No more messages to send + } + sent = new HashSet(); + b = batchProvider.get(); + while(it.hasNext()) { + MessageId m = it.next(); + b.addMessage(db.getMessage(txn, m)); + sent.add(m); + } + b.seal(); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.readLock().unlock(); + } + // Record the contents of the batch + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + assert !sent.isEmpty(); + db.addOutstandingBatch(txn, c, b.getId(), sent); + db.commitTransaction(txn); + return b; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + public Rating getRating(AuthorId a) throws DbException { ratingLock.readLock().lock(); try { @@ -183,15 +313,200 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { } } - public void removeNeighbour(NeighbourId n) throws DbException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing neighbour " + n); + public Set getSubscriptions() throws DbException { + subscriptionLock.readLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + HashSet subs = new HashSet(); + for(GroupId g : db.getSubscriptions(txn)) subs.add(g); + db.commitTransaction(txn); + return subs; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + subscriptionLock.readLock().unlock(); + } + } + + public void receiveBundle(ContactId c, Bundle b) throws DbException { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received bundle from " + c + ", " + + b.getSize() + " bytes"); + // Mark all messages in acked batches as seen + contactLock.readLock().lock(); + try { + if(!containsContact(c)) return; + messageLock.readLock().lock(); + try { + messageStatusLock.writeLock().lock(); + try { + int acks = 0; + for(BatchId ack : b.getAcks()) { + acks++; + Txn txn = db.startTransaction(); + try { + db.removeAckedBatch(txn, c, ack); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + acks + " acks"); + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + // Update the contact's subscriptions + contactLock.readLock().lock(); + try { + if(!containsContact(c)) return; + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + db.clearSubscriptions(txn, c); + int subs = 0; + for(GroupId g : b.getSubscriptions()) { + subs++; + db.addSubscription(txn, c, g); + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + subs + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().lock(); + } + // Store the messages + int batches = 0; + for(Batch batch : b.getBatches()) { + batches++; + waitForPermissionToWrite(); + contactLock.readLock().lock(); + try { + if(!containsContact(c)) return; + messageLock.writeLock().lock(); + try { + messageStatusLock.writeLock().lock(); + try { + subscriptionLock.readLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + int received = 0, stored = 0; + for(Message m : batch.getMessages()) { + received++; + GroupId g = m.getGroup(); + if(db.containsSubscription(txn, g)) { + if(storeMessage(txn, m, c)) stored++; + } + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + received + + " messages, stored " + stored); + db.addBatchToAck(txn, c, batch.getId()); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + subscriptionLock.readLock().unlock(); + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + batches + " batches"); + // Find any lost batches that need to be retransmitted + Set lost; + contactLock.readLock().lock(); + try { + if(!containsContact(c)) return; + messageLock.readLock().lock(); + try { + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + lost = db.addReceivedBundle(txn, c, b.getId()); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + for(BatchId batch : lost) { + contactLock.readLock().lock(); + try { + if(!containsContact(c)) return; + messageLock.readLock().lock(); + try { + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Removing lost batch"); + db.removeLostBatch(txn, c, batch); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + System.gc(); + } + + public void removeContact(ContactId c) throws DbException { + if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing contact " + c); contactLock.writeLock().lock(); try { messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { - db.removeNeighbour(txn, n); + db.removeContact(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -231,24 +546,6 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { } } - public Set getSubscriptions() throws DbException { - subscriptionLock.readLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - HashSet subs = new HashSet(); - for(GroupId g : db.getSubscriptions(txn)) subs.add(g); - db.commitTransaction(txn); - return subs; - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - subscriptionLock.readLock().unlock(); - } - } - public void subscribe(GroupId g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); subscriptionLock.writeLock().lock(); @@ -297,301 +594,4 @@ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { contactLock.readLock().unlock(); } } - - public void generateBundle(NeighbourId n, Bundle b) throws DbException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + n); - // Ack all batches received from the neighbour - contactLock.readLock().lock(); - try { - if(!containsNeighbour(n)) return; - messageStatusLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - int numAcks = 0; - for(BatchId ack : db.removeBatchesToAck(txn, n)) { - b.addAck(ack); - numAcks++; - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + numAcks + " acks"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.writeLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - // Add a list of subscriptions - contactLock.readLock().lock(); - try { - if(!containsNeighbour(n)) return; - subscriptionLock.readLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - int numSubs = 0; - for(GroupId g : db.getSubscriptions(txn)) { - b.addSubscription(g); - numSubs++; - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + numSubs + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - subscriptionLock.readLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - // Add as many messages as possible to the bundle - long capacity = b.getCapacity(); - while(true) { - Batch batch = fillBatch(n, capacity); - if(batch == null) break; // No more messages to send - b.addBatch(batch); - capacity -= batch.getSize(); - // If the batch is less than half full, stop trying - there may be - // more messages trickling in but we can't wait forever - if(batch.getSize() * 2 < Batch.CAPACITY) break; - } - b.seal(); - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Bundle sent, " + b.getSize() + " bytes"); - System.gc(); - } - - private Batch fillBatch(NeighbourId n, long capacity) throws DbException { - contactLock.readLock().lock(); - try { - if(!containsNeighbour(n)) return null; - messageLock.readLock().lock(); - try { - Set sent; - Batch b; - messageStatusLock.readLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - capacity = Math.min(capacity, Batch.CAPACITY); - Iterator it = - db.getSendableMessages(txn, n, capacity).iterator(); - if(!it.hasNext()) { - db.commitTransaction(txn); - return null; // No more messages to send - } - sent = new HashSet(); - b = batchProvider.get(); - while(it.hasNext()) { - MessageId m = it.next(); - b.addMessage(db.getMessage(txn, m)); - sent.add(m); - } - b.seal(); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.readLock().unlock(); - } - // Record the contents of the batch - messageStatusLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - assert !sent.isEmpty(); - db.addOutstandingBatch(txn, n, b.getId(), sent); - db.commitTransaction(txn); - return b; - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.writeLock().unlock(); - } - } finally { - messageLock.readLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - } - - public void receiveBundle(NeighbourId n, Bundle b) throws DbException { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received bundle from " + n + ", " - + b.getSize() + " bytes"); - // Mark all messages in acked batches as seen - contactLock.readLock().lock(); - try { - if(!containsNeighbour(n)) return; - messageLock.readLock().lock(); - try { - messageStatusLock.writeLock().lock(); - try { - int acks = 0; - for(BatchId ack : b.getAcks()) { - acks++; - Txn txn = db.startTransaction(); - try { - db.removeAckedBatch(txn, n, ack); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + acks + " acks"); - } finally { - messageStatusLock.writeLock().unlock(); - } - } finally { - messageLock.readLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - // Update the neighbour's subscriptions - contactLock.readLock().lock(); - try { - if(!containsNeighbour(n)) return; - messageStatusLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - db.clearSubscriptions(txn, n); - int subs = 0; - for(GroupId g : b.getSubscriptions()) { - subs++; - db.addSubscription(txn, n, g); - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + subs + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.writeLock().unlock(); - } - } finally { - contactLock.readLock().lock(); - } - // Store the messages - int batches = 0; - for(Batch batch : b.getBatches()) { - batches++; - waitForPermissionToWrite(); - contactLock.readLock().lock(); - try { - if(!containsNeighbour(n)) return; - messageLock.writeLock().lock(); - try { - messageStatusLock.writeLock().lock(); - try { - subscriptionLock.readLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - int received = 0, stored = 0; - for(Message m : batch.getMessages()) { - received++; - GroupId g = m.getGroup(); - if(db.containsSubscription(txn, g)) { - if(storeMessage(txn, m, n)) stored++; - } - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + received - + " messages, stored " + stored); - db.addBatchToAck(txn, n, batch.getId()); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - subscriptionLock.readLock().unlock(); - } - } finally { - messageStatusLock.writeLock().unlock(); - } - } finally { - messageLock.writeLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + batches + " batches"); - // Find any lost batches that need to be retransmitted - Set lost; - contactLock.readLock().lock(); - try { - if(!containsNeighbour(n)) return; - messageLock.readLock().lock(); - try { - messageStatusLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - lost = db.addReceivedBundle(txn, n, b.getId()); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.writeLock().unlock(); - } - } finally { - messageLock.readLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - for(BatchId batch : lost) { - contactLock.readLock().lock(); - try { - if(!containsNeighbour(n)) return; - messageLock.readLock().lock(); - try { - messageStatusLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Removing lost batch"); - db.removeLostBatch(txn, n, batch); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.writeLock().unlock(); - } - } finally { - messageLock.readLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - } - System.gc(); - } } \ No newline at end of file diff --git a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java index 1008156d1..6e514a2c4 100644 --- a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java +++ b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java @@ -7,7 +7,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import net.sf.briar.api.db.DbException; -import net.sf.briar.api.db.NeighbourId; +import net.sf.briar.api.db.ContactId; import net.sf.briar.api.db.Rating; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; @@ -42,25 +42,6 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { super(db, batchProvider); } - protected void expireMessages(long size) throws DbException { - synchronized(contactLock) { - synchronized(messageLock) { - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - for(MessageId m : db.getOldMessages(txn, size)) { - removeMessage(txn, m); - } - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - } - public void close() throws DbException { synchronized(contactLock) { synchronized(messageLock) { @@ -75,13 +56,13 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { } } - public void addNeighbour(NeighbourId n) throws DbException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n); + public void addContact(ContactId c) throws DbException { + if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding contact " + c); synchronized(contactLock) { synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { - db.addNeighbour(txn, n); + db.addContact(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -117,6 +98,120 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { } } + protected void expireMessages(long size) throws DbException { + synchronized(contactLock) { + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + for(MessageId m : db.getOldMessages(txn, size)) { + removeMessage(txn, m); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } + + public void generateBundle(ContactId c, Bundle b) throws DbException { + if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c); + // Ack all batches received from c + synchronized(contactLock) { + if(!containsContact(c)) return; + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + int numAcks = 0; + for(BatchId ack : db.removeBatchesToAck(txn, c)) { + b.addAck(ack); + numAcks++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + numAcks + " acks"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + // Add a list of subscriptions + synchronized(contactLock) { + if(!containsContact(c)) return; + synchronized(subscriptionLock) { + Txn txn = db.startTransaction(); + try { + int numSubs = 0; + for(GroupId g : db.getSubscriptions(txn)) { + b.addSubscription(g); + numSubs++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + numSubs + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + // Add as many messages as possible to the bundle + long capacity = b.getCapacity(); + while(true) { + Batch batch = fillBatch(c, capacity); + if(batch == null) break; // No more messages to send + b.addBatch(batch); + capacity -= batch.getSize(); + // If the batch is less than half full, stop trying - there may be + // more messages trickling in but we can't wait forever + if(batch.getSize() * 2 < Batch.CAPACITY) break; + } + b.seal(); + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Bundle sent, " + b.getSize() + " bytes"); + System.gc(); + } + + private Batch fillBatch(ContactId c, long capacity) throws DbException { + synchronized(contactLock) { + if(!containsContact(c)) return null; + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + capacity = Math.min(capacity, Batch.CAPACITY); + Iterator it = + db.getSendableMessages(txn, c, capacity).iterator(); + if(!it.hasNext()) { + db.commitTransaction(txn); + return null; // No more messages to send + } + Batch b = batchProvider.get(); + Set sent = new HashSet(); + while(it.hasNext()) { + MessageId m = it.next(); + b.addMessage(db.getMessage(txn, m)); + sent.add(m); + } + b.seal(); + // Record the contents of the batch + assert !sent.isEmpty(); + db.addOutstandingBatch(txn, c, b.getId(), sent); + db.commitTransaction(txn); + return b; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } + public Rating getRating(AuthorId a) throws DbException { synchronized(ratingLock) { Txn txn = db.startTransaction(); @@ -131,6 +226,159 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { } } + public Set getSubscriptions() throws DbException { + synchronized(subscriptionLock) { + Txn txn = db.startTransaction(); + try { + HashSet subs = new HashSet(); + for(GroupId g : db.getSubscriptions(txn)) subs.add(g); + db.commitTransaction(txn); + return subs; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + + public void receiveBundle(ContactId c, Bundle b) throws DbException { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received bundle from " + c + ", " + + b.getSize() + " bytes"); + // Mark all messages in acked batches as seen + synchronized(contactLock) { + if(!containsContact(c)) return; + synchronized(messageLock) { + synchronized(messageStatusLock) { + int acks = 0; + for(BatchId ack : b.getAcks()) { + acks++; + Txn txn = db.startTransaction(); + try { + db.removeAckedBatch(txn, c, ack); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + acks + " acks"); + } + } + } + // Update the contact's subscriptions + synchronized(contactLock) { + if(!containsContact(c)) return; + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + db.clearSubscriptions(txn, c); + int subs = 0; + for(GroupId g : b.getSubscriptions()) { + subs++; + db.addSubscription(txn, c, g); + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + subs + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + // Store the messages + int batches = 0; + for(Batch batch : b.getBatches()) { + batches++; + waitForPermissionToWrite(); + synchronized(contactLock) { + if(!containsContact(c)) return; + synchronized(messageLock) { + synchronized(messageStatusLock) { + synchronized(subscriptionLock) { + Txn txn = db.startTransaction(); + try { + int received = 0, stored = 0; + for(Message m : batch.getMessages()) { + received++; + GroupId g = m.getGroup(); + if(db.containsSubscription(txn, g)) { + if(storeMessage(txn, m, c)) stored++; + } + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + received + + " messages, stored " + stored); + db.addBatchToAck(txn, c, batch.getId()); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + batches + " batches"); + // Find any lost batches that need to be retransmitted + Set lost; + synchronized(contactLock) { + if(!containsContact(c)) return; + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + lost = db.addReceivedBundle(txn, c, b.getId()); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + for(BatchId batch : lost) { + synchronized(contactLock) { + if(!containsContact(c)) return; + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Removing lost batch"); + db.removeLostBatch(txn, c, batch); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } + System.gc(); + } + + public void removeContact(ContactId c) throws DbException { + if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing contact " + c); + synchronized(contactLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + db.removeContact(txn, c); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + public void setRating(AuthorId a, Rating r) throws DbException { synchronized(messageLock) { synchronized(ratingLock) { @@ -151,21 +399,6 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { } } - public Set getSubscriptions() throws DbException { - synchronized(subscriptionLock) { - Txn txn = db.startTransaction(); - try { - HashSet subs = new HashSet(); - for(GroupId g : db.getSubscriptions(txn)) subs.add(g); - db.commitTransaction(txn); - return subs; - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - public void subscribe(GroupId g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); synchronized(subscriptionLock) { @@ -199,237 +432,4 @@ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { } } } - - public void generateBundle(NeighbourId n, Bundle b) throws DbException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + n); - // Ack all batches received from the neighbour - synchronized(contactLock) { - if(!containsNeighbour(n)) return; - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - int numAcks = 0; - for(BatchId ack : db.removeBatchesToAck(txn, n)) { - b.addAck(ack); - numAcks++; - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + numAcks + " acks"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - // Add a list of subscriptions - synchronized(contactLock) { - if(!containsNeighbour(n)) return; - synchronized(subscriptionLock) { - Txn txn = db.startTransaction(); - try { - int numSubs = 0; - for(GroupId g : db.getSubscriptions(txn)) { - b.addSubscription(g); - numSubs++; - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + numSubs + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - // Add as many messages as possible to the bundle - long capacity = b.getCapacity(); - while(true) { - Batch batch = fillBatch(n, capacity); - if(batch == null) break; // No more messages to send - b.addBatch(batch); - capacity -= batch.getSize(); - // If the batch is less than half full, stop trying - there may be - // more messages trickling in but we can't wait forever - if(batch.getSize() * 2 < Batch.CAPACITY) break; - } - b.seal(); - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Bundle sent, " + b.getSize() + " bytes"); - System.gc(); - } - - private Batch fillBatch(NeighbourId n, long capacity) throws DbException { - synchronized(contactLock) { - if(!containsNeighbour(n)) return null; - synchronized(messageLock) { - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - capacity = Math.min(capacity, Batch.CAPACITY); - Iterator it = - db.getSendableMessages(txn, n, capacity).iterator(); - if(!it.hasNext()) { - db.commitTransaction(txn); - return null; // No more messages to send - } - Batch b = batchProvider.get(); - Set sent = new HashSet(); - while(it.hasNext()) { - MessageId m = it.next(); - b.addMessage(db.getMessage(txn, m)); - sent.add(m); - } - b.seal(); - // Record the contents of the batch - assert !sent.isEmpty(); - db.addOutstandingBatch(txn, n, b.getId(), sent); - db.commitTransaction(txn); - return b; - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - } - - public void removeNeighbour(NeighbourId n) throws DbException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing neighbour " + n); - synchronized(contactLock) { - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - db.removeNeighbour(txn, n); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - - public void receiveBundle(NeighbourId n, Bundle b) throws DbException { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received bundle from " + n + ", " - + b.getSize() + " bytes"); - // Mark all messages in acked batches as seen - synchronized(contactLock) { - if(!containsNeighbour(n)) return; - synchronized(messageLock) { - synchronized(messageStatusLock) { - int acks = 0; - for(BatchId ack : b.getAcks()) { - acks++; - Txn txn = db.startTransaction(); - try { - db.removeAckedBatch(txn, n, ack); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + acks + " acks"); - } - } - } - // Update the neighbour's subscriptions - synchronized(contactLock) { - if(!containsNeighbour(n)) return; - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - db.clearSubscriptions(txn, n); - int subs = 0; - for(GroupId g : b.getSubscriptions()) { - subs++; - db.addSubscription(txn, n, g); - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + subs + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - // Store the messages - int batches = 0; - for(Batch batch : b.getBatches()) { - batches++; - waitForPermissionToWrite(); - synchronized(contactLock) { - if(!containsNeighbour(n)) return; - synchronized(messageLock) { - synchronized(messageStatusLock) { - synchronized(subscriptionLock) { - Txn txn = db.startTransaction(); - try { - int received = 0, stored = 0; - for(Message m : batch.getMessages()) { - received++; - GroupId g = m.getGroup(); - if(db.containsSubscription(txn, g)) { - if(storeMessage(txn, m, n)) stored++; - } - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + received - + " messages, stored " + stored); - db.addBatchToAck(txn, n, batch.getId()); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - } - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + batches + " batches"); - // Find any lost batches that need to be retransmitted - Set lost; - synchronized(contactLock) { - if(!containsNeighbour(n)) return; - synchronized(messageLock) { - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - lost = db.addReceivedBundle(txn, n, b.getId()); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - for(BatchId batch : lost) { - synchronized(contactLock) { - if(!containsNeighbour(n)) return; - synchronized(messageLock) { - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Removing lost batch"); - db.removeLostBatch(txn, n, batch); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - } - System.gc(); - } }