mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-11 18:29:05 +01:00
Changed "neighbour" to "contact" throughout (messy, but it's only going to get messier later). Also reordered some methods in DatabaseComponent impls.
This commit is contained in:
@@ -1,11 +1,11 @@
|
|||||||
package net.sf.briar.api.db;
|
package net.sf.briar.api.db;
|
||||||
|
|
||||||
/** Uniquely identifies a neighbour. */
|
/** Uniquely identifies a contact. */
|
||||||
public class NeighbourId {
|
public class ContactId {
|
||||||
|
|
||||||
private final int id;
|
private final int id;
|
||||||
|
|
||||||
public NeighbourId(int id) {
|
public ContactId(int id) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -15,7 +15,7 @@ public class NeighbourId {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if(o instanceof NeighbourId) return id == ((NeighbourId) o).id;
|
if(o instanceof ContactId) return id == ((ContactId) o).id;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -29,14 +29,14 @@ public interface DatabaseComponent {
|
|||||||
/** Waits for any open transactions to finish and closes the database. */
|
/** Waits for any open transactions to finish and closes the database. */
|
||||||
void close() throws DbException;
|
void close() throws DbException;
|
||||||
|
|
||||||
|
/** Adds a new contact to the database. */
|
||||||
|
void addContact(ContactId c) throws DbException;
|
||||||
|
|
||||||
/** Adds a locally generated message to the database. */
|
/** Adds a locally generated message to the database. */
|
||||||
void addLocallyGeneratedMessage(Message m) throws DbException;
|
void addLocallyGeneratedMessage(Message m) throws DbException;
|
||||||
|
|
||||||
/** Adds a new neighbour to the database. */
|
/** Generates a bundle of messages for the given contact. */
|
||||||
void addNeighbour(NeighbourId n) throws DbException;
|
void generateBundle(ContactId c, Bundle b) throws DbException;
|
||||||
|
|
||||||
/** Generates a bundle of messages for the given neighbour. */
|
|
||||||
void generateBundle(NeighbourId n, Bundle b) throws DbException;
|
|
||||||
|
|
||||||
/** Returns the user's rating for the given author. */
|
/** Returns the user's rating for the given author. */
|
||||||
Rating getRating(AuthorId a) throws DbException;
|
Rating getRating(AuthorId a) throws DbException;
|
||||||
@@ -45,13 +45,13 @@ public interface DatabaseComponent {
|
|||||||
Set<GroupId> getSubscriptions() throws DbException;
|
Set<GroupId> getSubscriptions() throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Processes a bundle of messages received from the given neighbour. Some
|
* Processes a bundle of messages received from the given contact. Some
|
||||||
* or all of the messages in the bundle may be stored.
|
* or all of the messages in the bundle may be stored.
|
||||||
*/
|
*/
|
||||||
void receiveBundle(NeighbourId n, Bundle b) throws DbException;
|
void receiveBundle(ContactId c, Bundle b) throws DbException;
|
||||||
|
|
||||||
/** Removes a neighbour (and all associated state) from the database. */
|
/** Removes a contact (and all associated state) from the database. */
|
||||||
void removeNeighbour(NeighbourId n) throws DbException;
|
void removeContact(ContactId c) throws DbException;
|
||||||
|
|
||||||
/** Records the user's rating for the given author. */
|
/** Records the user's rating for the given author. */
|
||||||
void setRating(AuthorId a, Rating r) throws DbException;
|
void setRating(AuthorId a, Rating r) throws DbException;
|
||||||
|
|||||||
@@ -1,17 +1,11 @@
|
|||||||
package net.sf.briar.api.db;
|
package net.sf.briar.api.db;
|
||||||
|
|
||||||
/** The status of a message with respect to a neighbour. */
|
/** The status of a message with respect to a particular contact. */
|
||||||
public enum Status {
|
public enum Status {
|
||||||
/**
|
/** The message has not been sent, received, or acked. */
|
||||||
* The message has not been sent to, received from, or acked by the
|
|
||||||
* neighbour.
|
|
||||||
*/
|
|
||||||
NEW,
|
NEW,
|
||||||
/**
|
/** The message has been sent, but not received or acked. */
|
||||||
* The message has been sent to, but not received from or acked by, the
|
|
||||||
* neighbour.
|
|
||||||
*/
|
|
||||||
SENT,
|
SENT,
|
||||||
/** The message has been received from or acked by the neighbour. */
|
/** The message has been received or acked. */
|
||||||
SEEN
|
SEEN
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package net.sf.briar.db;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import net.sf.briar.api.db.DbException;
|
import net.sf.briar.api.db.DbException;
|
||||||
import net.sf.briar.api.db.NeighbourId;
|
import net.sf.briar.api.db.ContactId;
|
||||||
import net.sf.briar.api.db.Rating;
|
import net.sf.briar.api.db.Rating;
|
||||||
import net.sf.briar.api.db.Status;
|
import net.sf.briar.api.db.Status;
|
||||||
import net.sf.briar.api.protocol.AuthorId;
|
import net.sf.briar.api.protocol.AuthorId;
|
||||||
@@ -62,7 +62,7 @@ interface Database<T> {
|
|||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messageStatuses write.
|
* Locking: contacts read, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
void addBatchToAck(T txn, NeighbourId n, BatchId b) throws DbException;
|
void addBatchToAck(T txn, ContactId c, BatchId b) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns false if the given message is already in the database. Otherwise
|
* Returns false if the given message is already in the database. Otherwise
|
||||||
@@ -73,18 +73,18 @@ interface Database<T> {
|
|||||||
boolean addMessage(T txn, Message m) throws DbException;
|
boolean addMessage(T txn, Message m) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a new neighbour to the database.
|
* Adds a new contact to the database.
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts write, messageStatuses write.
|
* Locking: contacts write, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
void addNeighbour(T txn, NeighbourId n) throws DbException;
|
void addContact(T txn, ContactId c) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Records a sent batch as needing to be acknowledged.
|
* Records a sent batch as needing to be acknowledged.
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messages read, messageStatuses write.
|
* Locking: contacts read, messages read, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
void addOutstandingBatch(T txn, NeighbourId n, BatchId b, Set<MessageId> sent) throws DbException;
|
void addOutstandingBatch(T txn, ContactId c, BatchId b, Set<MessageId> sent) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Records a received bundle. This should be called after processing the
|
* Records a received bundle. This should be called after processing the
|
||||||
@@ -93,7 +93,7 @@ interface Database<T> {
|
|||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messages read, messageStatuses write.
|
* Locking: contacts read, messages read, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
Set<BatchId> addReceivedBundle(T txn, NeighbourId n, BundleId b) throws DbException;
|
Set<BatchId> addReceivedBundle(T txn, ContactId c, BundleId b) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subscribes to the given group.
|
* Subscribes to the given group.
|
||||||
@@ -103,18 +103,25 @@ interface Database<T> {
|
|||||||
void addSubscription(T txn, GroupId g) throws DbException;
|
void addSubscription(T txn, GroupId g) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Records a neighbour's subscription to a group.
|
* Records a contact's subscription to a group.
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messageStatuses write.
|
* Locking: contacts read, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
void addSubscription(T txn, NeighbourId n, GroupId g) throws DbException;
|
void addSubscription(T txn, ContactId c, GroupId g) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes all recorded subscriptions for the given neighbour.
|
* Removes all recorded subscriptions for the given contact.
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messageStatuses write.
|
* Locking: contacts read, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
void clearSubscriptions(T txn, NeighbourId n) throws DbException;
|
void clearSubscriptions(T txn, ContactId c) throws DbException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true iff the database contains the given contact.
|
||||||
|
* <p>
|
||||||
|
* Locking: contacts read.
|
||||||
|
*/
|
||||||
|
boolean containsContact(T txn, ContactId c) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true iff the database contains the given message.
|
* Returns true iff the database contains the given message.
|
||||||
@@ -123,13 +130,6 @@ interface Database<T> {
|
|||||||
*/
|
*/
|
||||||
boolean containsMessage(T txn, MessageId m) throws DbException;
|
boolean containsMessage(T txn, MessageId m) throws DbException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns true iff the database contains the given neighbour.
|
|
||||||
* <p>
|
|
||||||
* Locking: contacts read.
|
|
||||||
*/
|
|
||||||
boolean containsNeighbour(T txn, NeighbourId n) throws DbException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true iff the user is subscribed to the given group.
|
* Returns true iff the user is subscribed to the given group.
|
||||||
* <p>
|
* <p>
|
||||||
@@ -137,6 +137,13 @@ interface Database<T> {
|
|||||||
*/
|
*/
|
||||||
boolean containsSubscription(T txn, GroupId g) throws DbException;
|
boolean containsSubscription(T txn, GroupId g) throws DbException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the IDs of all contacts.
|
||||||
|
* <p>
|
||||||
|
* Locking: contacts read, messageStatuses read.
|
||||||
|
*/
|
||||||
|
Set<ContactId> getContacts(T txn) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the amount of free storage space available to the database, in
|
* Returns the amount of free storage space available to the database, in
|
||||||
* bytes. This is based on the minimum of the space available on the device
|
* bytes. This is based on the minimum of the space available on the device
|
||||||
@@ -168,13 +175,6 @@ interface Database<T> {
|
|||||||
*/
|
*/
|
||||||
Iterable<MessageId> getMessagesByParent(T txn, MessageId m) throws DbException;
|
Iterable<MessageId> getMessagesByParent(T txn, MessageId m) throws DbException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the IDs of all neighbours.
|
|
||||||
* <p>
|
|
||||||
* Locking: contacts read, messageStatuses read.
|
|
||||||
*/
|
|
||||||
Set<NeighbourId> getNeighbours(T txn) throws DbException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the IDs of the oldest messages in the database, with a total
|
* Returns the IDs of the oldest messages in the database, with a total
|
||||||
* size less than or equal to the given size.
|
* size less than or equal to the given size.
|
||||||
@@ -200,7 +200,7 @@ interface Database<T> {
|
|||||||
/**
|
/**
|
||||||
* Returns the sendability score of the given message. Messages with
|
* Returns the sendability score of the given message. Messages with
|
||||||
* sendability scores greater than zero are eligible to be sent to
|
* sendability scores greater than zero are eligible to be sent to
|
||||||
* neighbours.
|
* contacts.
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: messages read.
|
* Locking: messages read.
|
||||||
*/
|
*/
|
||||||
@@ -208,11 +208,11 @@ interface Database<T> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the IDs of some messages that are eligible to be sent to the
|
* Returns the IDs of some messages that are eligible to be sent to the
|
||||||
* given neighbour, with a total size less than or equal to the given size.
|
* given contact, with a total size less than or equal to the given size.
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messages read, messageStatuses read.
|
* Locking: contacts read, messages read, messageStatuses read.
|
||||||
*/
|
*/
|
||||||
Iterable<MessageId> getSendableMessages(T txn, NeighbourId n, long capacity) throws DbException;
|
Iterable<MessageId> getSendableMessages(T txn, ContactId c, long capacity) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the groups to which the user subscribes.
|
* Returns the groups to which the user subscribes.
|
||||||
@@ -224,28 +224,35 @@ interface Database<T> {
|
|||||||
/**
|
/**
|
||||||
* Removes an outstanding batch that has been acknowledged. Any messages in
|
* Removes an outstanding batch that has been acknowledged. Any messages in
|
||||||
* the batch that are still considered outstanding (Status.SENT) with
|
* the batch that are still considered outstanding (Status.SENT) with
|
||||||
* respect to the given neighbour are now considered seen (Status.SEEN).
|
* respect to the given contact are now considered seen (Status.SEEN).
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messages read, messageStatuses write.
|
* Locking: contacts read, messages read, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
void removeAckedBatch(T txn, NeighbourId n, BatchId b) throws DbException;
|
void removeAckedBatch(T txn, ContactId c, BatchId b) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes and returns the IDs of any batches received from the given
|
* Removes and returns the IDs of any batches received from the given
|
||||||
* neighbour that need to be acknowledged.
|
* contact that need to be acknowledged.
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messageStatuses write.
|
* Locking: contacts read, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
Set<BatchId> removeBatchesToAck(T txn, NeighbourId n) throws DbException;
|
Set<BatchId> removeBatchesToAck(T txn, ContactId c) throws DbException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a contact (and all associated state) from the database.
|
||||||
|
* <p>
|
||||||
|
* Locking: contacts write, messageStatuses write.
|
||||||
|
*/
|
||||||
|
void removeContact(T txn, ContactId c) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes an outstanding batch that has been lost. Any messages in the
|
* Removes an outstanding batch that has been lost. Any messages in the
|
||||||
* batch that are still considered outstanding (Status.SENT) with respect
|
* batch that are still considered outstanding (Status.SENT) with respect
|
||||||
* to the given neighbour are now considered unsent (Status.NEW).
|
* to the given contact are now considered unsent (Status.NEW).
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messages read, messageStatuses write.
|
* Locking: contacts read, messages read, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
void removeLostBatch(T txn, NeighbourId n, BatchId b) throws DbException;
|
void removeLostBatch(T txn, ContactId c, BatchId b) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes a message (and all associated state) from the database.
|
* Removes a message (and all associated state) from the database.
|
||||||
@@ -254,13 +261,6 @@ interface Database<T> {
|
|||||||
*/
|
*/
|
||||||
void removeMessage(T txn, MessageId m) throws DbException;
|
void removeMessage(T txn, MessageId m) throws DbException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes a neighbour (and all associated state) from the database.
|
|
||||||
* <p>
|
|
||||||
* Locking: contacts write, messageStatuses write.
|
|
||||||
*/
|
|
||||||
void removeNeighbour(T txn, NeighbourId n) throws DbException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unsubscribes from the given group. Any messages belonging to the group
|
* Unsubscribes from the given group. Any messages belonging to the group
|
||||||
* are deleted from the database.
|
* are deleted from the database.
|
||||||
@@ -285,9 +285,9 @@ interface Database<T> {
|
|||||||
void setSendability(T txn, MessageId m, int sendability) throws DbException;
|
void setSendability(T txn, MessageId m, int sendability) throws DbException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the status of the given message with respect to the given neighbour.
|
* Sets the status of the given message with respect to the given contact.
|
||||||
* <p>
|
* <p>
|
||||||
* Locking: contacts read, messages read, messageStatuses write.
|
* Locking: contacts read, messages read, messageStatuses write.
|
||||||
*/
|
*/
|
||||||
void setStatus(T txn, NeighbourId n, MessageId m, Status s) throws DbException;
|
void setStatus(T txn, ContactId c, MessageId m, Status s) throws DbException;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import java.util.logging.Logger;
|
|||||||
|
|
||||||
import net.sf.briar.api.db.DatabaseComponent;
|
import net.sf.briar.api.db.DatabaseComponent;
|
||||||
import net.sf.briar.api.db.DbException;
|
import net.sf.briar.api.db.DbException;
|
||||||
import net.sf.briar.api.db.NeighbourId;
|
import net.sf.briar.api.db.ContactId;
|
||||||
import net.sf.briar.api.db.Rating;
|
import net.sf.briar.api.db.Rating;
|
||||||
import net.sf.briar.api.db.Status;
|
import net.sf.briar.api.db.Status;
|
||||||
import net.sf.briar.api.protocol.AuthorId;
|
import net.sf.briar.api.protocol.AuthorId;
|
||||||
@@ -75,10 +75,10 @@ abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Locking: contacts read
|
// Locking: contacts read
|
||||||
protected boolean containsNeighbour(NeighbourId n) throws DbException {
|
protected boolean containsContact(ContactId c) throws DbException {
|
||||||
Txn txn = db.startTransaction();
|
Txn txn = db.startTransaction();
|
||||||
try {
|
try {
|
||||||
boolean contains = db.containsNeighbour(txn, n);
|
boolean contains = db.containsContact(txn, c);
|
||||||
db.commitTransaction(txn);
|
db.commitTransaction(txn);
|
||||||
return contains;
|
return contains;
|
||||||
} catch(DbException e) {
|
} catch(DbException e) {
|
||||||
@@ -141,16 +141,16 @@ abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Locking: contacts read, messages write, messageStatuses write
|
// Locking: contacts read, messages write, messageStatuses write
|
||||||
protected boolean storeMessage(Txn txn, Message m, NeighbourId sender)
|
protected boolean storeMessage(Txn txn, Message m, ContactId sender)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
boolean added = db.addMessage(txn, m);
|
boolean added = db.addMessage(txn, m);
|
||||||
// Mark the message as seen by the sender
|
// Mark the message as seen by the sender
|
||||||
MessageId id = m.getId();
|
MessageId id = m.getId();
|
||||||
if(sender != null) db.setStatus(txn, sender, id, Status.SEEN);
|
if(sender != null) db.setStatus(txn, sender, id, Status.SEEN);
|
||||||
if(added) {
|
if(added) {
|
||||||
// Mark the message as unseen by other neighbours
|
// Mark the message as unseen by other contacts
|
||||||
for(NeighbourId n : db.getNeighbours(txn)) {
|
for(ContactId c : db.getContacts(txn)) {
|
||||||
if(!n.equals(sender)) db.setStatus(txn, n, id, Status.NEW);
|
if(!c.equals(sender)) db.setStatus(txn, c, id, Status.NEW);
|
||||||
}
|
}
|
||||||
// Calculate and store the message's sendability
|
// Calculate and store the message's sendability
|
||||||
int sendability = calculateSendability(txn, m);
|
int sendability = calculateSendability(txn, m);
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ import java.util.logging.Logger;
|
|||||||
|
|
||||||
import net.sf.briar.api.db.DatabaseComponent;
|
import net.sf.briar.api.db.DatabaseComponent;
|
||||||
import net.sf.briar.api.db.DbException;
|
import net.sf.briar.api.db.DbException;
|
||||||
import net.sf.briar.api.db.NeighbourId;
|
import net.sf.briar.api.db.ContactId;
|
||||||
import net.sf.briar.api.db.Rating;
|
import net.sf.briar.api.db.Rating;
|
||||||
import net.sf.briar.api.db.Status;
|
import net.sf.briar.api.db.Status;
|
||||||
import net.sf.briar.api.protocol.AuthorId;
|
import net.sf.briar.api.protocol.AuthorId;
|
||||||
@@ -64,46 +64,46 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
private static final String INDEX_MESSAGES_BY_SENDABILITY =
|
private static final String INDEX_MESSAGES_BY_SENDABILITY =
|
||||||
"CREATE INDEX messagesBySendability ON messages (sendability)";
|
"CREATE INDEX messagesBySendability ON messages (sendability)";
|
||||||
|
|
||||||
private static final String CREATE_NEIGHBOURS =
|
private static final String CREATE_CONTACTS =
|
||||||
"CREATE TABLE neighbours"
|
"CREATE TABLE contacts"
|
||||||
+ " (neighbourId INT NOT NULL,"
|
+ " (contactId INT NOT NULL,"
|
||||||
+ " lastBundleReceived XXXX NOT NULL,"
|
+ " lastBundleReceived XXXX NOT NULL,"
|
||||||
+ " PRIMARY KEY (neighbourId))";
|
+ " PRIMARY KEY (contactId))";
|
||||||
|
|
||||||
private static final String CREATE_BATCHES_TO_ACK =
|
private static final String CREATE_BATCHES_TO_ACK =
|
||||||
"CREATE TABLE batchesToAck"
|
"CREATE TABLE batchesToAck"
|
||||||
+ " (batchId XXXX NOT NULL,"
|
+ " (batchId XXXX NOT NULL,"
|
||||||
+ " neighbourId INT NOT NULL,"
|
+ " contactId INT NOT NULL,"
|
||||||
+ " PRIMARY KEY (batchId),"
|
+ " PRIMARY KEY (batchId),"
|
||||||
+ " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)"
|
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
|
||||||
+ " ON DELETE CASCADE)";
|
+ " ON DELETE CASCADE)";
|
||||||
|
|
||||||
private static final String CREATE_NEIGHBOUR_SUBSCRIPTIONS =
|
private static final String CREATE_CONTACT_SUBSCRIPTIONS =
|
||||||
"CREATE TABLE neighbourSubscriptions"
|
"CREATE TABLE contactSubscriptions"
|
||||||
+ " (neighbourId INT NOT NULL,"
|
+ " (contactId INT NOT NULL,"
|
||||||
+ " groupId XXXX NOT NULL,"
|
+ " groupId XXXX NOT NULL,"
|
||||||
+ " PRIMARY KEY (neighbourId, groupId),"
|
+ " PRIMARY KEY (contactId, groupId),"
|
||||||
+ " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)"
|
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
|
||||||
+ " ON DELETE CASCADE)";
|
+ " ON DELETE CASCADE)";
|
||||||
|
|
||||||
private static final String CREATE_OUTSTANDING_BATCHES =
|
private static final String CREATE_OUTSTANDING_BATCHES =
|
||||||
"CREATE TABLE outstandingBatches"
|
"CREATE TABLE outstandingBatches"
|
||||||
+ " (batchId XXXX NOT NULL,"
|
+ " (batchId XXXX NOT NULL,"
|
||||||
+ " neighbourId INT NOT NULL,"
|
+ " contactId INT NOT NULL,"
|
||||||
+ " lastBundleReceived XXXX NOT NULL,"
|
+ " lastBundleReceived XXXX NOT NULL,"
|
||||||
+ " PRIMARY KEY (batchId),"
|
+ " PRIMARY KEY (batchId),"
|
||||||
+ " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)"
|
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
|
||||||
+ " ON DELETE CASCADE)";
|
+ " ON DELETE CASCADE)";
|
||||||
|
|
||||||
private static final String CREATE_OUTSTANDING_MESSAGES =
|
private static final String CREATE_OUTSTANDING_MESSAGES =
|
||||||
"CREATE TABLE outstandingMessages"
|
"CREATE TABLE outstandingMessages"
|
||||||
+ " (batchId XXXX NOT NULL,"
|
+ " (batchId XXXX NOT NULL,"
|
||||||
+ " neighbourId INT NOT NULL,"
|
+ " contactId INT NOT NULL,"
|
||||||
+ " messageId XXXX NOT NULL,"
|
+ " messageId XXXX NOT NULL,"
|
||||||
+ " PRIMARY KEY (batchId, messageId),"
|
+ " PRIMARY KEY (batchId, messageId),"
|
||||||
+ " FOREIGN KEY (batchId) REFERENCES outstandingBatches (batchId)"
|
+ " FOREIGN KEY (batchId) REFERENCES outstandingBatches (batchId)"
|
||||||
+ " ON DELETE CASCADE,"
|
+ " ON DELETE CASCADE,"
|
||||||
+ " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)"
|
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
|
||||||
+ " ON DELETE CASCADE,"
|
+ " ON DELETE CASCADE,"
|
||||||
+ " FOREIGN KEY (messageId) REFERENCES messages (messageId)"
|
+ " FOREIGN KEY (messageId) REFERENCES messages (messageId)"
|
||||||
+ " ON DELETE CASCADE)";
|
+ " ON DELETE CASCADE)";
|
||||||
@@ -121,28 +121,28 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
private static final String CREATE_RECEIVED_BUNDLES =
|
private static final String CREATE_RECEIVED_BUNDLES =
|
||||||
"CREATE TABLE receivedBundles"
|
"CREATE TABLE receivedBundles"
|
||||||
+ " (bundleId XXXX NOT NULL,"
|
+ " (bundleId XXXX NOT NULL,"
|
||||||
+ " neighbourId INT NOT NULL,"
|
+ " contactId INT NOT NULL,"
|
||||||
+ " timestamp BIGINT NOT NULL,"
|
+ " timestamp BIGINT NOT NULL,"
|
||||||
+ " PRIMARY KEY (bundleId),"
|
+ " PRIMARY KEY (bundleId),"
|
||||||
+ " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)"
|
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
|
||||||
+ " ON DELETE CASCADE)";
|
+ " ON DELETE CASCADE)";
|
||||||
|
|
||||||
private static final String CREATE_STATUSES =
|
private static final String CREATE_STATUSES =
|
||||||
"CREATE TABLE statuses"
|
"CREATE TABLE statuses"
|
||||||
+ " (messageId XXXX NOT NULL,"
|
+ " (messageId XXXX NOT NULL,"
|
||||||
+ " neighbourId INT NOT NULL,"
|
+ " contactId INT NOT NULL,"
|
||||||
+ " status SMALLINT NOT NULL,"
|
+ " status SMALLINT NOT NULL,"
|
||||||
+ " PRIMARY KEY (messageId, neighbourId),"
|
+ " PRIMARY KEY (messageId, contactId),"
|
||||||
+ " FOREIGN KEY (messageId) REFERENCES messages (messageId)"
|
+ " FOREIGN KEY (messageId) REFERENCES messages (messageId)"
|
||||||
+ " ON DELETE CASCADE,"
|
+ " ON DELETE CASCADE,"
|
||||||
+ " FOREIGN KEY (neighbourId) REFERENCES neighbours (neighbourId)"
|
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
|
||||||
+ " ON DELETE CASCADE)";
|
+ " ON DELETE CASCADE)";
|
||||||
|
|
||||||
private static final String INDEX_STATUSES_BY_MESSAGE =
|
private static final String INDEX_STATUSES_BY_MESSAGE =
|
||||||
"CREATE INDEX statusesByMessage ON statuses (messageId)";
|
"CREATE INDEX statusesByMessage ON statuses (messageId)";
|
||||||
|
|
||||||
private static final String INDEX_STATUSES_BY_NEIGHBOUR =
|
private static final String INDEX_STATUSES_BY_CONTACT =
|
||||||
"CREATE INDEX statusesByNeighbour ON statuses (neighbourId)";
|
"CREATE INDEX statusesByContact ON statuses (contactId)";
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
Logger.getLogger(JdbcDatabase.class.getName());
|
Logger.getLogger(JdbcDatabase.class.getName());
|
||||||
@@ -207,14 +207,14 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
s.executeUpdate(INDEX_MESSAGES_BY_TIMESTAMP);
|
s.executeUpdate(INDEX_MESSAGES_BY_TIMESTAMP);
|
||||||
s.executeUpdate(INDEX_MESSAGES_BY_SENDABILITY);
|
s.executeUpdate(INDEX_MESSAGES_BY_SENDABILITY);
|
||||||
if(LOG.isLoggable(Level.FINE))
|
if(LOG.isLoggable(Level.FINE))
|
||||||
LOG.fine("Creating neighbours table");
|
LOG.fine("Creating contacts table");
|
||||||
s.executeUpdate(insertHashType(CREATE_NEIGHBOURS));
|
s.executeUpdate(insertHashType(CREATE_CONTACTS));
|
||||||
if(LOG.isLoggable(Level.FINE))
|
if(LOG.isLoggable(Level.FINE))
|
||||||
LOG.fine("Creating batchesToAck table");
|
LOG.fine("Creating batchesToAck table");
|
||||||
s.executeUpdate(insertHashType(CREATE_BATCHES_TO_ACK));
|
s.executeUpdate(insertHashType(CREATE_BATCHES_TO_ACK));
|
||||||
if(LOG.isLoggable(Level.FINE))
|
if(LOG.isLoggable(Level.FINE))
|
||||||
LOG.fine("Creating neighbourSubscriptions table");
|
LOG.fine("Creating contactSubscriptions table");
|
||||||
s.executeUpdate(insertHashType(CREATE_NEIGHBOUR_SUBSCRIPTIONS));
|
s.executeUpdate(insertHashType(CREATE_CONTACT_SUBSCRIPTIONS));
|
||||||
if(LOG.isLoggable(Level.FINE))
|
if(LOG.isLoggable(Level.FINE))
|
||||||
LOG.fine("Creating outstandingBatches table");
|
LOG.fine("Creating outstandingBatches table");
|
||||||
s.executeUpdate(insertHashType(CREATE_OUTSTANDING_BATCHES));
|
s.executeUpdate(insertHashType(CREATE_OUTSTANDING_BATCHES));
|
||||||
@@ -232,7 +232,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
LOG.fine("Creating statuses table");
|
LOG.fine("Creating statuses table");
|
||||||
s.executeUpdate(insertHashType(CREATE_STATUSES));
|
s.executeUpdate(insertHashType(CREATE_STATUSES));
|
||||||
s.executeUpdate(INDEX_STATUSES_BY_MESSAGE);
|
s.executeUpdate(INDEX_STATUSES_BY_MESSAGE);
|
||||||
s.executeUpdate(INDEX_STATUSES_BY_NEIGHBOUR);
|
s.executeUpdate(INDEX_STATUSES_BY_CONTACT);
|
||||||
s.close();
|
s.close();
|
||||||
} catch(SQLException e) {
|
} catch(SQLException e) {
|
||||||
tryToClose(s);
|
tryToClose(s);
|
||||||
@@ -340,16 +340,35 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addBatchToAck(Connection txn, NeighbourId n, BatchId b)
|
public void addBatchToAck(Connection txn, ContactId c, BatchId b)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
try {
|
try {
|
||||||
String sql = "INSERT INTO batchesToAck"
|
String sql = "INSERT INTO batchesToAck"
|
||||||
+ " (batchId, neighbourId)"
|
+ " (batchId, contactId)"
|
||||||
+ " VALUES (?, ?)";
|
+ " VALUES (?, ?)";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, b.getBytes());
|
ps.setBytes(1, b.getBytes());
|
||||||
ps.setInt(2, n.getInt());
|
ps.setInt(2, c.getInt());
|
||||||
|
int rowsAffected = ps.executeUpdate();
|
||||||
|
assert rowsAffected == 1;
|
||||||
|
ps.close();
|
||||||
|
} catch(SQLException e) {
|
||||||
|
tryToClose(ps);
|
||||||
|
tryToClose(txn);
|
||||||
|
throw new DbException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addContact(Connection txn, ContactId c) throws DbException {
|
||||||
|
PreparedStatement ps = null;
|
||||||
|
try {
|
||||||
|
String sql = "INSERT INTO contacts"
|
||||||
|
+ " (contactId, lastBundleReceived)"
|
||||||
|
+ " VALUES (?, ?)";
|
||||||
|
ps = txn.prepareStatement(sql);
|
||||||
|
ps.setInt(1, c.getInt());
|
||||||
|
ps.setBytes(2, BundleId.NONE.getBytes());
|
||||||
int rowsAffected = ps.executeUpdate();
|
int rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == 1;
|
assert rowsAffected == 1;
|
||||||
ps.close();
|
ps.close();
|
||||||
@@ -388,35 +407,16 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addNeighbour(Connection txn, NeighbourId n) throws DbException {
|
public void addOutstandingBatch(Connection txn, ContactId c, BatchId b,
|
||||||
PreparedStatement ps = null;
|
|
||||||
try {
|
|
||||||
String sql = "INSERT INTO neighbours"
|
|
||||||
+ " (neighbourId, lastBundleReceived)"
|
|
||||||
+ " VALUES (?, ?)";
|
|
||||||
ps = txn.prepareStatement(sql);
|
|
||||||
ps.setInt(1, n.getInt());
|
|
||||||
ps.setBytes(2, BundleId.NONE.getBytes());
|
|
||||||
int rowsAffected = ps.executeUpdate();
|
|
||||||
assert rowsAffected == 1;
|
|
||||||
ps.close();
|
|
||||||
} catch(SQLException e) {
|
|
||||||
tryToClose(ps);
|
|
||||||
tryToClose(txn);
|
|
||||||
throw new DbException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addOutstandingBatch(Connection txn, NeighbourId n, BatchId b,
|
|
||||||
Set<MessageId> sent) throws DbException {
|
Set<MessageId> sent) throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
// Find the ID of the last bundle received from n
|
// Find the ID of the last bundle received from c
|
||||||
String sql = "SELECT lastBundleReceived FROM neighbours"
|
String sql = "SELECT lastBundleReceived FROM contacts"
|
||||||
+ " WHERE neighbourId = ?";
|
+ " WHERE contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setInt(1, c.getInt());
|
||||||
rs = ps.executeQuery();
|
rs = ps.executeQuery();
|
||||||
boolean found = rs.next();
|
boolean found = rs.next();
|
||||||
assert found;
|
assert found;
|
||||||
@@ -427,22 +427,22 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
ps.close();
|
ps.close();
|
||||||
// Create an outstanding batch row
|
// Create an outstanding batch row
|
||||||
sql = "INSERT INTO outstandingBatches"
|
sql = "INSERT INTO outstandingBatches"
|
||||||
+ " (batchId, neighbourId, lastBundleReceived)"
|
+ " (batchId, contactId, lastBundleReceived)"
|
||||||
+ " VALUES (?, ?, ?)";
|
+ " VALUES (?, ?, ?)";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, b.getBytes());
|
ps.setBytes(1, b.getBytes());
|
||||||
ps.setInt(2, n.getInt());
|
ps.setInt(2, c.getInt());
|
||||||
ps.setBytes(3, lastBundleReceived);
|
ps.setBytes(3, lastBundleReceived);
|
||||||
int rowsAffected = ps.executeUpdate();
|
int rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == 1;
|
assert rowsAffected == 1;
|
||||||
ps.close();
|
ps.close();
|
||||||
// Create an outstanding message row for each message in the batch
|
// Create an outstanding message row for each message in the batch
|
||||||
sql = "INSERT INTO outstandingMessages"
|
sql = "INSERT INTO outstandingMessages"
|
||||||
+ " (batchId, neighbourId, messageId)"
|
+ " (batchId, contactId, messageId)"
|
||||||
+ " VALUES (?, ?, ?)";
|
+ " VALUES (?, ?, ?)";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, b.getBytes());
|
ps.setBytes(1, b.getBytes());
|
||||||
ps.setInt(2, n.getInt());
|
ps.setInt(2, c.getInt());
|
||||||
for(MessageId m : sent) {
|
for(MessageId m : sent) {
|
||||||
ps.setBytes(3, m.getBytes());
|
ps.setBytes(3, m.getBytes());
|
||||||
ps.addBatch();
|
ps.addBatch();
|
||||||
@@ -455,10 +455,10 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
ps.close();
|
ps.close();
|
||||||
// Set the status of each message in the batch to SENT
|
// Set the status of each message in the batch to SENT
|
||||||
sql = "UPDATE statuses SET status = ?"
|
sql = "UPDATE statuses SET status = ?"
|
||||||
+ " WHERE messageId = ? AND neighbourId = ? AND status = ?";
|
+ " WHERE messageId = ? AND contactId = ? AND status = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setShort(1, (short) Status.SENT.ordinal());
|
ps.setShort(1, (short) Status.SENT.ordinal());
|
||||||
ps.setInt(3, n.getInt());
|
ps.setInt(3, c.getInt());
|
||||||
ps.setShort(4, (short) Status.NEW.ordinal());
|
ps.setShort(4, (short) Status.NEW.ordinal());
|
||||||
for(MessageId m : sent) {
|
for(MessageId m : sent) {
|
||||||
ps.setBytes(2, m.getBytes());
|
ps.setBytes(2, m.getBytes());
|
||||||
@@ -478,25 +478,25 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<BatchId> addReceivedBundle(Connection txn, NeighbourId n,
|
public Set<BatchId> addReceivedBundle(Connection txn, ContactId c,
|
||||||
BundleId b) throws DbException {
|
BundleId b) throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
// Update the ID of the last bundle received from n
|
// Update the ID of the last bundle received from c
|
||||||
String sql = "UPDATE neighbours SET lastBundleReceived = ?"
|
String sql = "UPDATE contacts SET lastBundleReceived = ?"
|
||||||
+ " WHERE neighbourId = ?";
|
+ " WHERE contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, b.getBytes());
|
ps.setBytes(1, b.getBytes());
|
||||||
ps.setInt(2, n.getInt());
|
ps.setInt(2, c.getInt());
|
||||||
int rowsAffected = ps.executeUpdate();
|
int rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == 1;
|
assert rowsAffected == 1;
|
||||||
ps.close();
|
ps.close();
|
||||||
// Count the received bundle records for n and find the oldest
|
// Count the received bundle records for c and find the oldest
|
||||||
sql = "SELECT bundleId, timestamp FROM receivedBundles"
|
sql = "SELECT bundleId, timestamp FROM receivedBundles"
|
||||||
+ " WHERE neighbourId = ?";
|
+ " WHERE contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setInt(1, c.getInt());
|
||||||
rs = ps.executeQuery();
|
rs = ps.executeQuery();
|
||||||
int received = 0;
|
int received = 0;
|
||||||
long oldestTimestamp = Long.MAX_VALUE;
|
long oldestTimestamp = Long.MAX_VALUE;
|
||||||
@@ -516,7 +516,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
if(received == DatabaseComponent.RETRANSMIT_THRESHOLD) {
|
if(received == DatabaseComponent.RETRANSMIT_THRESHOLD) {
|
||||||
// Expire batches related to the oldest received bundle
|
// Expire batches related to the oldest received bundle
|
||||||
assert oldestBundle != null;
|
assert oldestBundle != null;
|
||||||
lost = findLostBatches(txn, n, oldestBundle);
|
lost = findLostBatches(txn, c, oldestBundle);
|
||||||
sql = "DELETE FROM receivedBundles WHERE bundleId = ?";
|
sql = "DELETE FROM receivedBundles WHERE bundleId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, oldestBundle);
|
ps.setBytes(1, oldestBundle);
|
||||||
@@ -528,11 +528,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
// Record the new received bundle
|
// Record the new received bundle
|
||||||
sql = "INSERT INTO receivedBundles"
|
sql = "INSERT INTO receivedBundles"
|
||||||
+ " (bundleId, neighbourId, timestamp)"
|
+ " (bundleId, contactId, timestamp)"
|
||||||
+ " VALUES (?, ?, ?)";
|
+ " VALUES (?, ?, ?)";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, b.getBytes());
|
ps.setBytes(1, b.getBytes());
|
||||||
ps.setInt(2, n.getInt());
|
ps.setInt(2, c.getInt());
|
||||||
ps.setLong(3, System.currentTimeMillis());
|
ps.setLong(3, System.currentTimeMillis());
|
||||||
rowsAffected = ps.executeUpdate();
|
rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == 1;
|
assert rowsAffected == 1;
|
||||||
@@ -546,15 +546,15 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<BatchId> findLostBatches(Connection txn, NeighbourId n,
|
private Set<BatchId> findLostBatches(Connection txn, ContactId c,
|
||||||
byte[] lastBundleReceived) throws DbException {
|
byte[] lastBundleReceived) throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
String sql = "SELECT batchId FROM outstandingBatches"
|
String sql = "SELECT batchId FROM outstandingBatches"
|
||||||
+ " WHERE neighbourId = ? AND lastBundleReceived = ?";
|
+ " WHERE contactId = ? AND lastBundleReceived = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setInt(1, c.getInt());
|
||||||
ps.setBytes(2, lastBundleReceived);
|
ps.setBytes(2, lastBundleReceived);
|
||||||
rs = ps.executeQuery();
|
rs = ps.executeQuery();
|
||||||
Set<BatchId> lost = new HashSet<BatchId>();
|
Set<BatchId> lost = new HashSet<BatchId>();
|
||||||
@@ -586,15 +586,15 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addSubscription(Connection txn, NeighbourId n, GroupId g)
|
public void addSubscription(Connection txn, ContactId c, GroupId g)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
try {
|
try {
|
||||||
String sql = "INSERT INTO neighbourSubscriptions"
|
String sql = "INSERT INTO contactSubscriptions"
|
||||||
+ " (neighbourId, groupId)"
|
+ " (contactId, groupId)"
|
||||||
+ " VALUES (?, ?)";
|
+ " VALUES (?, ?)";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setInt(1, c.getInt());
|
||||||
ps.setBytes(2, g.getBytes());
|
ps.setBytes(2, g.getBytes());
|
||||||
int rowsAffected = ps.executeUpdate();
|
int rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == 1;
|
assert rowsAffected == 1;
|
||||||
@@ -606,14 +606,14 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearSubscriptions(Connection txn, NeighbourId n)
|
public void clearSubscriptions(Connection txn, ContactId c)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
try {
|
try {
|
||||||
String sql = "DELETE FROM neighbourSubscriptions"
|
String sql = "DELETE FROM contactSubscriptions"
|
||||||
+ " WHERE neighbourId = ?";
|
+ " WHERE contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setInt(1, c.getInt());
|
||||||
ps.executeUpdate();
|
ps.executeUpdate();
|
||||||
ps.close();
|
ps.close();
|
||||||
} catch(SQLException e) {
|
} catch(SQLException e) {
|
||||||
@@ -623,15 +623,15 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean containsMessage(Connection txn, MessageId m)
|
public boolean containsContact(Connection txn, ContactId c)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
String sql = "SELECT COUNT(messageId) FROM messages"
|
String sql = "SELECT COUNT(contactId) FROM contacts"
|
||||||
+ " WHERE messageId = ?";
|
+ " WHERE contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, m.getBytes());
|
ps.setInt(1, c.getInt());
|
||||||
rs = ps.executeQuery();
|
rs = ps.executeQuery();
|
||||||
boolean found = rs.next();
|
boolean found = rs.next();
|
||||||
assert found;
|
assert found;
|
||||||
@@ -650,15 +650,15 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean containsNeighbour(Connection txn, NeighbourId n)
|
public boolean containsMessage(Connection txn, MessageId m)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
String sql = "SELECT COUNT(neighbourId) FROM neighbours"
|
String sql = "SELECT COUNT(messageId) FROM messages"
|
||||||
+ " WHERE neighbourId = ?";
|
+ " WHERE messageId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setBytes(1, m.getBytes());
|
||||||
rs = ps.executeQuery();
|
rs = ps.executeQuery();
|
||||||
boolean found = rs.next();
|
boolean found = rs.next();
|
||||||
assert found;
|
assert found;
|
||||||
@@ -704,6 +704,26 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Set<ContactId> getContacts(Connection txn) throws DbException {
|
||||||
|
PreparedStatement ps = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
String sql = "SELECT contactId FROM contacts";
|
||||||
|
ps = txn.prepareStatement(sql);
|
||||||
|
rs = ps.executeQuery();
|
||||||
|
Set<ContactId> ids = new HashSet<ContactId>();
|
||||||
|
while(rs.next()) ids.add(new ContactId(rs.getInt(1)));
|
||||||
|
rs.close();
|
||||||
|
ps.close();
|
||||||
|
return ids;
|
||||||
|
} catch(SQLException e) {
|
||||||
|
tryToClose(rs);
|
||||||
|
tryToClose(ps);
|
||||||
|
tryToClose(txn);
|
||||||
|
throw new DbException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected long getDiskSpace(File f) {
|
protected long getDiskSpace(File f) {
|
||||||
long total = 0L;
|
long total = 0L;
|
||||||
if(f.isDirectory()) {
|
if(f.isDirectory()) {
|
||||||
@@ -790,26 +810,6 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<NeighbourId> getNeighbours(Connection txn) throws DbException {
|
|
||||||
PreparedStatement ps = null;
|
|
||||||
ResultSet rs = null;
|
|
||||||
try {
|
|
||||||
String sql = "SELECT neighbourId FROM neighbours";
|
|
||||||
ps = txn.prepareStatement(sql);
|
|
||||||
rs = ps.executeQuery();
|
|
||||||
Set<NeighbourId> ids = new HashSet<NeighbourId>();
|
|
||||||
while(rs.next()) ids.add(new NeighbourId(rs.getInt(1)));
|
|
||||||
rs.close();
|
|
||||||
ps.close();
|
|
||||||
return ids;
|
|
||||||
} catch(SQLException e) {
|
|
||||||
tryToClose(rs);
|
|
||||||
tryToClose(ps);
|
|
||||||
tryToClose(txn);
|
|
||||||
throw new DbException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getNumberOfMessages(Connection txn) throws DbException {
|
public int getNumberOfMessages(Connection txn) throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
@@ -936,19 +936,19 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Iterable<MessageId> getSendableMessages(Connection txn,
|
public Iterable<MessageId> getSendableMessages(Connection txn,
|
||||||
NeighbourId n, long capacity) throws DbException {
|
ContactId c, long capacity) throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
String sql = "SELECT size, messages.messageId FROM messages"
|
String sql = "SELECT size, messages.messageId FROM messages"
|
||||||
+ " JOIN neighbourSubscriptions"
|
+ " JOIN contactSubscriptions"
|
||||||
+ " ON messages.groupId = neighbourSubscriptions.groupId"
|
+ " ON messages.groupId = contactSubscriptions.groupId"
|
||||||
+ " JOIN statuses ON messages.messageId = statuses.messageId"
|
+ " JOIN statuses ON messages.messageId = statuses.messageId"
|
||||||
+ " WHERE neighbourSubscriptions.neighbourId = ?"
|
+ " WHERE contactSubscriptions.contactId = ?"
|
||||||
+ " AND statuses.neighbourId = ? AND status = ?";
|
+ " AND statuses.contactId = ? AND status = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setInt(1, c.getInt());
|
||||||
ps.setInt(2, n.getInt());
|
ps.setInt(2, c.getInt());
|
||||||
ps.setShort(3, (short) Status.NEW.ordinal());
|
ps.setShort(3, (short) Status.NEW.ordinal());
|
||||||
rs = ps.executeQuery();
|
rs = ps.executeQuery();
|
||||||
List<MessageId> ids = new ArrayList<MessageId>();
|
List<MessageId> ids = new ArrayList<MessageId>();
|
||||||
@@ -995,27 +995,27 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeAckedBatch(Connection txn, NeighbourId n, BatchId b)
|
public void removeAckedBatch(Connection txn, ContactId c, BatchId b)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
removeBatch(txn, n, b, Status.SEEN);
|
removeBatch(txn, c, b, Status.SEEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeBatch(Connection txn, NeighbourId n, BatchId b,
|
private void removeBatch(Connection txn, ContactId c, BatchId b,
|
||||||
Status newStatus) throws DbException {
|
Status newStatus) throws DbException {
|
||||||
PreparedStatement ps = null, ps1 = null;
|
PreparedStatement ps = null, ps1 = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
String sql = "SELECT messageId FROM outstandingMessages"
|
String sql = "SELECT messageId FROM outstandingMessages"
|
||||||
+ " WHERE neighbourId = ? AND batchId = ?";
|
+ " WHERE contactId = ? AND batchId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setInt(1, c.getInt());
|
||||||
ps.setBytes(2, b.getBytes());
|
ps.setBytes(2, b.getBytes());
|
||||||
rs = ps.executeQuery();
|
rs = ps.executeQuery();
|
||||||
sql = "UPDATE statuses SET status = ?"
|
sql = "UPDATE statuses SET status = ?"
|
||||||
+ " WHERE messageId = ? AND neighbourId = ? AND status = ?";
|
+ " WHERE messageId = ? AND contactId = ? AND status = ?";
|
||||||
ps1 = txn.prepareStatement(sql);
|
ps1 = txn.prepareStatement(sql);
|
||||||
ps1.setShort(1, (short) newStatus.ordinal());
|
ps1.setShort(1, (short) newStatus.ordinal());
|
||||||
ps1.setInt(3, n.getInt());
|
ps1.setInt(3, c.getInt());
|
||||||
ps1.setShort(4, (short) Status.SENT.ordinal());
|
ps1.setShort(4, (short) Status.SENT.ordinal());
|
||||||
int messages = 0;
|
int messages = 0;
|
||||||
while(rs.next()) {
|
while(rs.next()) {
|
||||||
@@ -1047,23 +1047,23 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<BatchId> removeBatchesToAck(Connection txn, NeighbourId n)
|
public Set<BatchId> removeBatchesToAck(Connection txn, ContactId c)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
String sql = "SELECT batchId FROM batchesToAck"
|
String sql = "SELECT batchId FROM batchesToAck"
|
||||||
+ " WHERE neighbourId = ?";
|
+ " WHERE contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setInt(1, c.getInt());
|
||||||
rs = ps.executeQuery();
|
rs = ps.executeQuery();
|
||||||
Set<BatchId> ids = new HashSet<BatchId>();
|
Set<BatchId> ids = new HashSet<BatchId>();
|
||||||
while(rs.next()) ids.add(new BatchId(rs.getBytes(1)));
|
while(rs.next()) ids.add(new BatchId(rs.getBytes(1)));
|
||||||
rs.close();
|
rs.close();
|
||||||
ps.close();
|
ps.close();
|
||||||
sql = "DELETE FROM batchesToAck WHERE neighbourId = ?";
|
sql = "DELETE FROM batchesToAck WHERE contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setInt(1, c.getInt());
|
||||||
int rowsAffected = ps.executeUpdate();
|
int rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == ids.size();
|
assert rowsAffected == ids.size();
|
||||||
ps.close();
|
ps.close();
|
||||||
@@ -1076,17 +1076,13 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeLostBatch(Connection txn, NeighbourId n, BatchId b)
|
public void removeContact(Connection txn, ContactId c)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
removeBatch(txn, n, b, Status.NEW);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void removeMessage(Connection txn, MessageId m) throws DbException {
|
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
try {
|
try {
|
||||||
String sql = "DELETE FROM messages WHERE messageId = ?";
|
String sql = "DELETE FROM contacts WHERE contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, m.getBytes());
|
ps.setInt(1, c.getInt());
|
||||||
int rowsAffected = ps.executeUpdate();
|
int rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == 1;
|
assert rowsAffected == 1;
|
||||||
ps.close();
|
ps.close();
|
||||||
@@ -1097,13 +1093,17 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeNeighbour(Connection txn, NeighbourId n)
|
public void removeLostBatch(Connection txn, ContactId c, BatchId b)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
|
removeBatch(txn, c, b, Status.NEW);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeMessage(Connection txn, MessageId m) throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
try {
|
try {
|
||||||
String sql = "DELETE FROM neighbours WHERE neighbourId = ?";
|
String sql = "DELETE FROM messages WHERE messageId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setInt(1, n.getInt());
|
ps.setBytes(1, m.getBytes());
|
||||||
int rowsAffected = ps.executeUpdate();
|
int rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == 1;
|
assert rowsAffected == 1;
|
||||||
ps.close();
|
ps.close();
|
||||||
@@ -1194,16 +1194,16 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setStatus(Connection txn, NeighbourId n, MessageId m, Status s)
|
public void setStatus(Connection txn, ContactId c, MessageId m, Status s)
|
||||||
throws DbException {
|
throws DbException {
|
||||||
PreparedStatement ps = null;
|
PreparedStatement ps = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
String sql = "SELECT status FROM statuses"
|
String sql = "SELECT status FROM statuses"
|
||||||
+ " WHERE messageId = ? AND neighbourId = ?";
|
+ " WHERE messageId = ? AND contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, m.getBytes());
|
ps.setBytes(1, m.getBytes());
|
||||||
ps.setInt(2, n.getInt());
|
ps.setInt(2, c.getInt());
|
||||||
rs = ps.executeQuery();
|
rs = ps.executeQuery();
|
||||||
if(rs.next()) {
|
if(rs.next()) {
|
||||||
Status old = Status.values()[rs.getByte(1)];
|
Status old = Status.values()[rs.getByte(1)];
|
||||||
@@ -1213,11 +1213,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
ps.close();
|
ps.close();
|
||||||
if(!old.equals(Status.SEEN) && !old.equals(s)) {
|
if(!old.equals(Status.SEEN) && !old.equals(s)) {
|
||||||
sql = "UPDATE statuses SET status = ?"
|
sql = "UPDATE statuses SET status = ?"
|
||||||
+ " WHERE messageId = ? AND neighbourId = ?";
|
+ " WHERE messageId = ? AND contactId = ?";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setShort(1, (short) s.ordinal());
|
ps.setShort(1, (short) s.ordinal());
|
||||||
ps.setBytes(2, m.getBytes());
|
ps.setBytes(2, m.getBytes());
|
||||||
ps.setInt(3, n.getInt());
|
ps.setInt(3, c.getInt());
|
||||||
int rowsAffected = ps.executeUpdate();
|
int rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == 1;
|
assert rowsAffected == 1;
|
||||||
ps.close();
|
ps.close();
|
||||||
@@ -1225,11 +1225,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
|||||||
} else {
|
} else {
|
||||||
rs.close();
|
rs.close();
|
||||||
ps.close();
|
ps.close();
|
||||||
sql = "INSERT INTO statuses (messageId, neighbourId, status)"
|
sql = "INSERT INTO statuses (messageId, contactId, status)"
|
||||||
+ " VALUES (?, ?, ?)";
|
+ " VALUES (?, ?, ?)";
|
||||||
ps = txn.prepareStatement(sql);
|
ps = txn.prepareStatement(sql);
|
||||||
ps.setBytes(1, m.getBytes());
|
ps.setBytes(1, m.getBytes());
|
||||||
ps.setInt(2, n.getInt());
|
ps.setInt(2, c.getInt());
|
||||||
ps.setShort(3, (short) s.ordinal());
|
ps.setShort(3, (short) s.ordinal());
|
||||||
int rowsAffected = ps.executeUpdate();
|
int rowsAffected = ps.executeUpdate();
|
||||||
assert rowsAffected == 1;
|
assert rowsAffected == 1;
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import java.util.logging.Level;
|
|||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
import net.sf.briar.api.db.DbException;
|
import net.sf.briar.api.db.DbException;
|
||||||
import net.sf.briar.api.db.NeighbourId;
|
import net.sf.briar.api.db.ContactId;
|
||||||
import net.sf.briar.api.db.Rating;
|
import net.sf.briar.api.db.Rating;
|
||||||
import net.sf.briar.api.protocol.AuthorId;
|
import net.sf.briar.api.protocol.AuthorId;
|
||||||
import net.sf.briar.api.protocol.Batch;
|
import net.sf.briar.api.protocol.Batch;
|
||||||
@@ -49,34 +49,6 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
super(db, batchProvider);
|
super(db, batchProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void expireMessages(long size) throws DbException {
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
messageLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
messageStatusLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
for(MessageId m : db.getOldMessages(txn, size)) {
|
|
||||||
removeMessage(txn, m);
|
|
||||||
}
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageStatusLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
contactLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close() throws DbException {
|
public void close() throws DbException {
|
||||||
contactLock.writeLock().lock();
|
contactLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
@@ -106,15 +78,15 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addNeighbour(NeighbourId n) throws DbException {
|
public void addContact(ContactId c) throws DbException {
|
||||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n);
|
if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding contact " + c);
|
||||||
contactLock.writeLock().lock();
|
contactLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
messageStatusLock.writeLock().lock();
|
messageStatusLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
Txn txn = db.startTransaction();
|
Txn txn = db.startTransaction();
|
||||||
try {
|
try {
|
||||||
db.addNeighbour(txn, n);
|
db.addContact(txn, c);
|
||||||
db.commitTransaction(txn);
|
db.commitTransaction(txn);
|
||||||
} catch(DbException e) {
|
} catch(DbException e) {
|
||||||
db.abortTransaction(txn);
|
db.abortTransaction(txn);
|
||||||
@@ -166,6 +138,164 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void expireMessages(long size) throws DbException {
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
messageLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
messageStatusLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
for(MessageId m : db.getOldMessages(txn, size)) {
|
||||||
|
removeMessage(txn, m);
|
||||||
|
}
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageStatusLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
contactLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void generateBundle(ContactId c, Bundle b) throws DbException {
|
||||||
|
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c);
|
||||||
|
// Ack all batches received from c
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
messageStatusLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
int numAcks = 0;
|
||||||
|
for(BatchId ack : db.removeBatchesToAck(txn, c)) {
|
||||||
|
b.addAck(ack);
|
||||||
|
numAcks++;
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Added " + numAcks + " acks");
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageStatusLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
contactLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
// Add a list of subscriptions
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
subscriptionLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
int numSubs = 0;
|
||||||
|
for(GroupId g : db.getSubscriptions(txn)) {
|
||||||
|
b.addSubscription(g);
|
||||||
|
numSubs++;
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Added " + numSubs + " subscriptions");
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
subscriptionLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
contactLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
// Add as many messages as possible to the bundle
|
||||||
|
long capacity = b.getCapacity();
|
||||||
|
while(true) {
|
||||||
|
Batch batch = fillBatch(c, capacity);
|
||||||
|
if(batch == null) break; // No more messages to send
|
||||||
|
b.addBatch(batch);
|
||||||
|
capacity -= batch.getSize();
|
||||||
|
// If the batch is less than half full, stop trying - there may be
|
||||||
|
// more messages trickling in but we can't wait forever
|
||||||
|
if(batch.getSize() * 2 < Batch.CAPACITY) break;
|
||||||
|
}
|
||||||
|
b.seal();
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Bundle sent, " + b.getSize() + " bytes");
|
||||||
|
System.gc();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Batch fillBatch(ContactId c, long capacity) throws DbException {
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if(!containsContact(c)) return null;
|
||||||
|
messageLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
Set<MessageId> sent;
|
||||||
|
Batch b;
|
||||||
|
messageStatusLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
capacity = Math.min(capacity, Batch.CAPACITY);
|
||||||
|
Iterator<MessageId> it =
|
||||||
|
db.getSendableMessages(txn, c, capacity).iterator();
|
||||||
|
if(!it.hasNext()) {
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
return null; // No more messages to send
|
||||||
|
}
|
||||||
|
sent = new HashSet<MessageId>();
|
||||||
|
b = batchProvider.get();
|
||||||
|
while(it.hasNext()) {
|
||||||
|
MessageId m = it.next();
|
||||||
|
b.addMessage(db.getMessage(txn, m));
|
||||||
|
sent.add(m);
|
||||||
|
}
|
||||||
|
b.seal();
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageStatusLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
// Record the contents of the batch
|
||||||
|
messageStatusLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
assert !sent.isEmpty();
|
||||||
|
db.addOutstandingBatch(txn, c, b.getId(), sent);
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
return b;
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageStatusLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
contactLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Rating getRating(AuthorId a) throws DbException {
|
public Rating getRating(AuthorId a) throws DbException {
|
||||||
ratingLock.readLock().lock();
|
ratingLock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
@@ -183,15 +313,200 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeNeighbour(NeighbourId n) throws DbException {
|
public Set<GroupId> getSubscriptions() throws DbException {
|
||||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing neighbour " + n);
|
subscriptionLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
HashSet<GroupId> subs = new HashSet<GroupId>();
|
||||||
|
for(GroupId g : db.getSubscriptions(txn)) subs.add(g);
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
return subs;
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
subscriptionLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void receiveBundle(ContactId c, Bundle b) throws DbException {
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received bundle from " + c + ", "
|
||||||
|
+ b.getSize() + " bytes");
|
||||||
|
// Mark all messages in acked batches as seen
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
messageLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
messageStatusLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
int acks = 0;
|
||||||
|
for(BatchId ack : b.getAcks()) {
|
||||||
|
acks++;
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
db.removeAckedBatch(txn, c, ack);
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received " + acks + " acks");
|
||||||
|
} finally {
|
||||||
|
messageStatusLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
contactLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
// Update the contact's subscriptions
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
messageStatusLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
db.clearSubscriptions(txn, c);
|
||||||
|
int subs = 0;
|
||||||
|
for(GroupId g : b.getSubscriptions()) {
|
||||||
|
subs++;
|
||||||
|
db.addSubscription(txn, c, g);
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received " + subs + " subscriptions");
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageStatusLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
}
|
||||||
|
// Store the messages
|
||||||
|
int batches = 0;
|
||||||
|
for(Batch batch : b.getBatches()) {
|
||||||
|
batches++;
|
||||||
|
waitForPermissionToWrite();
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
messageLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
messageStatusLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
subscriptionLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
int received = 0, stored = 0;
|
||||||
|
for(Message m : batch.getMessages()) {
|
||||||
|
received++;
|
||||||
|
GroupId g = m.getGroup();
|
||||||
|
if(db.containsSubscription(txn, g)) {
|
||||||
|
if(storeMessage(txn, m, c)) stored++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received " + received
|
||||||
|
+ " messages, stored " + stored);
|
||||||
|
db.addBatchToAck(txn, c, batch.getId());
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
subscriptionLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageStatusLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
contactLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received " + batches + " batches");
|
||||||
|
// Find any lost batches that need to be retransmitted
|
||||||
|
Set<BatchId> lost;
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
messageLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
messageStatusLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
lost = db.addReceivedBundle(txn, c, b.getId());
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageStatusLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
contactLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
for(BatchId batch : lost) {
|
||||||
|
contactLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
messageLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
messageStatusLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Removing lost batch");
|
||||||
|
db.removeLostBatch(txn, c, batch);
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageStatusLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
messageLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
contactLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
System.gc();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeContact(ContactId c) throws DbException {
|
||||||
|
if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing contact " + c);
|
||||||
contactLock.writeLock().lock();
|
contactLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
messageStatusLock.writeLock().lock();
|
messageStatusLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
Txn txn = db.startTransaction();
|
Txn txn = db.startTransaction();
|
||||||
try {
|
try {
|
||||||
db.removeNeighbour(txn, n);
|
db.removeContact(txn, c);
|
||||||
db.commitTransaction(txn);
|
db.commitTransaction(txn);
|
||||||
} catch(DbException e) {
|
} catch(DbException e) {
|
||||||
db.abortTransaction(txn);
|
db.abortTransaction(txn);
|
||||||
@@ -231,24 +546,6 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<GroupId> getSubscriptions() throws DbException {
|
|
||||||
subscriptionLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
HashSet<GroupId> subs = new HashSet<GroupId>();
|
|
||||||
for(GroupId g : db.getSubscriptions(txn)) subs.add(g);
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
return subs;
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
subscriptionLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void subscribe(GroupId g) throws DbException {
|
public void subscribe(GroupId g) throws DbException {
|
||||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
|
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
|
||||||
subscriptionLock.writeLock().lock();
|
subscriptionLock.writeLock().lock();
|
||||||
@@ -297,301 +594,4 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
contactLock.readLock().unlock();
|
contactLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void generateBundle(NeighbourId n, Bundle b) throws DbException {
|
|
||||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + n);
|
|
||||||
// Ack all batches received from the neighbour
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
messageStatusLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
int numAcks = 0;
|
|
||||||
for(BatchId ack : db.removeBatchesToAck(txn, n)) {
|
|
||||||
b.addAck(ack);
|
|
||||||
numAcks++;
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Added " + numAcks + " acks");
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageStatusLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
contactLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
// Add a list of subscriptions
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
subscriptionLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
int numSubs = 0;
|
|
||||||
for(GroupId g : db.getSubscriptions(txn)) {
|
|
||||||
b.addSubscription(g);
|
|
||||||
numSubs++;
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Added " + numSubs + " subscriptions");
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
subscriptionLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
contactLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
// Add as many messages as possible to the bundle
|
|
||||||
long capacity = b.getCapacity();
|
|
||||||
while(true) {
|
|
||||||
Batch batch = fillBatch(n, capacity);
|
|
||||||
if(batch == null) break; // No more messages to send
|
|
||||||
b.addBatch(batch);
|
|
||||||
capacity -= batch.getSize();
|
|
||||||
// If the batch is less than half full, stop trying - there may be
|
|
||||||
// more messages trickling in but we can't wait forever
|
|
||||||
if(batch.getSize() * 2 < Batch.CAPACITY) break;
|
|
||||||
}
|
|
||||||
b.seal();
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Bundle sent, " + b.getSize() + " bytes");
|
|
||||||
System.gc();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Batch fillBatch(NeighbourId n, long capacity) throws DbException {
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
if(!containsNeighbour(n)) return null;
|
|
||||||
messageLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
Set<MessageId> sent;
|
|
||||||
Batch b;
|
|
||||||
messageStatusLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
capacity = Math.min(capacity, Batch.CAPACITY);
|
|
||||||
Iterator<MessageId> it =
|
|
||||||
db.getSendableMessages(txn, n, capacity).iterator();
|
|
||||||
if(!it.hasNext()) {
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
return null; // No more messages to send
|
|
||||||
}
|
|
||||||
sent = new HashSet<MessageId>();
|
|
||||||
b = batchProvider.get();
|
|
||||||
while(it.hasNext()) {
|
|
||||||
MessageId m = it.next();
|
|
||||||
b.addMessage(db.getMessage(txn, m));
|
|
||||||
sent.add(m);
|
|
||||||
}
|
|
||||||
b.seal();
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageStatusLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
// Record the contents of the batch
|
|
||||||
messageStatusLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
assert !sent.isEmpty();
|
|
||||||
db.addOutstandingBatch(txn, n, b.getId(), sent);
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
return b;
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageStatusLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
contactLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void receiveBundle(NeighbourId n, Bundle b) throws DbException {
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received bundle from " + n + ", "
|
|
||||||
+ b.getSize() + " bytes");
|
|
||||||
// Mark all messages in acked batches as seen
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
messageLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
messageStatusLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
int acks = 0;
|
|
||||||
for(BatchId ack : b.getAcks()) {
|
|
||||||
acks++;
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
db.removeAckedBatch(txn, n, ack);
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received " + acks + " acks");
|
|
||||||
} finally {
|
|
||||||
messageStatusLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
contactLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
// Update the neighbour's subscriptions
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
messageStatusLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
db.clearSubscriptions(txn, n);
|
|
||||||
int subs = 0;
|
|
||||||
for(GroupId g : b.getSubscriptions()) {
|
|
||||||
subs++;
|
|
||||||
db.addSubscription(txn, n, g);
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received " + subs + " subscriptions");
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageStatusLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
}
|
|
||||||
// Store the messages
|
|
||||||
int batches = 0;
|
|
||||||
for(Batch batch : b.getBatches()) {
|
|
||||||
batches++;
|
|
||||||
waitForPermissionToWrite();
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
messageLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
messageStatusLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
subscriptionLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
int received = 0, stored = 0;
|
|
||||||
for(Message m : batch.getMessages()) {
|
|
||||||
received++;
|
|
||||||
GroupId g = m.getGroup();
|
|
||||||
if(db.containsSubscription(txn, g)) {
|
|
||||||
if(storeMessage(txn, m, n)) stored++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received " + received
|
|
||||||
+ " messages, stored " + stored);
|
|
||||||
db.addBatchToAck(txn, n, batch.getId());
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
subscriptionLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageStatusLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
contactLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received " + batches + " batches");
|
|
||||||
// Find any lost batches that need to be retransmitted
|
|
||||||
Set<BatchId> lost;
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
messageLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
messageStatusLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
lost = db.addReceivedBundle(txn, n, b.getId());
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageStatusLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
contactLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
for(BatchId batch : lost) {
|
|
||||||
contactLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
messageLock.readLock().lock();
|
|
||||||
try {
|
|
||||||
messageStatusLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Removing lost batch");
|
|
||||||
db.removeLostBatch(txn, n, batch);
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageStatusLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
messageLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
contactLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
System.gc();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
@@ -7,7 +7,7 @@ import java.util.logging.Level;
|
|||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
import net.sf.briar.api.db.DbException;
|
import net.sf.briar.api.db.DbException;
|
||||||
import net.sf.briar.api.db.NeighbourId;
|
import net.sf.briar.api.db.ContactId;
|
||||||
import net.sf.briar.api.db.Rating;
|
import net.sf.briar.api.db.Rating;
|
||||||
import net.sf.briar.api.protocol.AuthorId;
|
import net.sf.briar.api.protocol.AuthorId;
|
||||||
import net.sf.briar.api.protocol.Batch;
|
import net.sf.briar.api.protocol.Batch;
|
||||||
@@ -42,25 +42,6 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
super(db, batchProvider);
|
super(db, batchProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void expireMessages(long size) throws DbException {
|
|
||||||
synchronized(contactLock) {
|
|
||||||
synchronized(messageLock) {
|
|
||||||
synchronized(messageStatusLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
for(MessageId m : db.getOldMessages(txn, size)) {
|
|
||||||
removeMessage(txn, m);
|
|
||||||
}
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close() throws DbException {
|
public void close() throws DbException {
|
||||||
synchronized(contactLock) {
|
synchronized(contactLock) {
|
||||||
synchronized(messageLock) {
|
synchronized(messageLock) {
|
||||||
@@ -75,13 +56,13 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addNeighbour(NeighbourId n) throws DbException {
|
public void addContact(ContactId c) throws DbException {
|
||||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n);
|
if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding contact " + c);
|
||||||
synchronized(contactLock) {
|
synchronized(contactLock) {
|
||||||
synchronized(messageStatusLock) {
|
synchronized(messageStatusLock) {
|
||||||
Txn txn = db.startTransaction();
|
Txn txn = db.startTransaction();
|
||||||
try {
|
try {
|
||||||
db.addNeighbour(txn, n);
|
db.addContact(txn, c);
|
||||||
db.commitTransaction(txn);
|
db.commitTransaction(txn);
|
||||||
} catch(DbException e) {
|
} catch(DbException e) {
|
||||||
db.abortTransaction(txn);
|
db.abortTransaction(txn);
|
||||||
@@ -117,6 +98,120 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void expireMessages(long size) throws DbException {
|
||||||
|
synchronized(contactLock) {
|
||||||
|
synchronized(messageLock) {
|
||||||
|
synchronized(messageStatusLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
for(MessageId m : db.getOldMessages(txn, size)) {
|
||||||
|
removeMessage(txn, m);
|
||||||
|
}
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void generateBundle(ContactId c, Bundle b) throws DbException {
|
||||||
|
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c);
|
||||||
|
// Ack all batches received from c
|
||||||
|
synchronized(contactLock) {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
synchronized(messageStatusLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
int numAcks = 0;
|
||||||
|
for(BatchId ack : db.removeBatchesToAck(txn, c)) {
|
||||||
|
b.addAck(ack);
|
||||||
|
numAcks++;
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Added " + numAcks + " acks");
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Add a list of subscriptions
|
||||||
|
synchronized(contactLock) {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
synchronized(subscriptionLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
int numSubs = 0;
|
||||||
|
for(GroupId g : db.getSubscriptions(txn)) {
|
||||||
|
b.addSubscription(g);
|
||||||
|
numSubs++;
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Added " + numSubs + " subscriptions");
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Add as many messages as possible to the bundle
|
||||||
|
long capacity = b.getCapacity();
|
||||||
|
while(true) {
|
||||||
|
Batch batch = fillBatch(c, capacity);
|
||||||
|
if(batch == null) break; // No more messages to send
|
||||||
|
b.addBatch(batch);
|
||||||
|
capacity -= batch.getSize();
|
||||||
|
// If the batch is less than half full, stop trying - there may be
|
||||||
|
// more messages trickling in but we can't wait forever
|
||||||
|
if(batch.getSize() * 2 < Batch.CAPACITY) break;
|
||||||
|
}
|
||||||
|
b.seal();
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Bundle sent, " + b.getSize() + " bytes");
|
||||||
|
System.gc();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Batch fillBatch(ContactId c, long capacity) throws DbException {
|
||||||
|
synchronized(contactLock) {
|
||||||
|
if(!containsContact(c)) return null;
|
||||||
|
synchronized(messageLock) {
|
||||||
|
synchronized(messageStatusLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
capacity = Math.min(capacity, Batch.CAPACITY);
|
||||||
|
Iterator<MessageId> it =
|
||||||
|
db.getSendableMessages(txn, c, capacity).iterator();
|
||||||
|
if(!it.hasNext()) {
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
return null; // No more messages to send
|
||||||
|
}
|
||||||
|
Batch b = batchProvider.get();
|
||||||
|
Set<MessageId> sent = new HashSet<MessageId>();
|
||||||
|
while(it.hasNext()) {
|
||||||
|
MessageId m = it.next();
|
||||||
|
b.addMessage(db.getMessage(txn, m));
|
||||||
|
sent.add(m);
|
||||||
|
}
|
||||||
|
b.seal();
|
||||||
|
// Record the contents of the batch
|
||||||
|
assert !sent.isEmpty();
|
||||||
|
db.addOutstandingBatch(txn, c, b.getId(), sent);
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
return b;
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Rating getRating(AuthorId a) throws DbException {
|
public Rating getRating(AuthorId a) throws DbException {
|
||||||
synchronized(ratingLock) {
|
synchronized(ratingLock) {
|
||||||
Txn txn = db.startTransaction();
|
Txn txn = db.startTransaction();
|
||||||
@@ -131,6 +226,159 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Set<GroupId> getSubscriptions() throws DbException {
|
||||||
|
synchronized(subscriptionLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
HashSet<GroupId> subs = new HashSet<GroupId>();
|
||||||
|
for(GroupId g : db.getSubscriptions(txn)) subs.add(g);
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
return subs;
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void receiveBundle(ContactId c, Bundle b) throws DbException {
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received bundle from " + c + ", "
|
||||||
|
+ b.getSize() + " bytes");
|
||||||
|
// Mark all messages in acked batches as seen
|
||||||
|
synchronized(contactLock) {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
synchronized(messageLock) {
|
||||||
|
synchronized(messageStatusLock) {
|
||||||
|
int acks = 0;
|
||||||
|
for(BatchId ack : b.getAcks()) {
|
||||||
|
acks++;
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
db.removeAckedBatch(txn, c, ack);
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received " + acks + " acks");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Update the contact's subscriptions
|
||||||
|
synchronized(contactLock) {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
synchronized(messageStatusLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
db.clearSubscriptions(txn, c);
|
||||||
|
int subs = 0;
|
||||||
|
for(GroupId g : b.getSubscriptions()) {
|
||||||
|
subs++;
|
||||||
|
db.addSubscription(txn, c, g);
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received " + subs + " subscriptions");
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Store the messages
|
||||||
|
int batches = 0;
|
||||||
|
for(Batch batch : b.getBatches()) {
|
||||||
|
batches++;
|
||||||
|
waitForPermissionToWrite();
|
||||||
|
synchronized(contactLock) {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
synchronized(messageLock) {
|
||||||
|
synchronized(messageStatusLock) {
|
||||||
|
synchronized(subscriptionLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
int received = 0, stored = 0;
|
||||||
|
for(Message m : batch.getMessages()) {
|
||||||
|
received++;
|
||||||
|
GroupId g = m.getGroup();
|
||||||
|
if(db.containsSubscription(txn, g)) {
|
||||||
|
if(storeMessage(txn, m, c)) stored++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received " + received
|
||||||
|
+ " messages, stored " + stored);
|
||||||
|
db.addBatchToAck(txn, c, batch.getId());
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Received " + batches + " batches");
|
||||||
|
// Find any lost batches that need to be retransmitted
|
||||||
|
Set<BatchId> lost;
|
||||||
|
synchronized(contactLock) {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
synchronized(messageLock) {
|
||||||
|
synchronized(messageStatusLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
lost = db.addReceivedBundle(txn, c, b.getId());
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for(BatchId batch : lost) {
|
||||||
|
synchronized(contactLock) {
|
||||||
|
if(!containsContact(c)) return;
|
||||||
|
synchronized(messageLock) {
|
||||||
|
synchronized(messageStatusLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
if(LOG.isLoggable(Level.FINE))
|
||||||
|
LOG.fine("Removing lost batch");
|
||||||
|
db.removeLostBatch(txn, c, batch);
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
System.gc();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeContact(ContactId c) throws DbException {
|
||||||
|
if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing contact " + c);
|
||||||
|
synchronized(contactLock) {
|
||||||
|
synchronized(messageStatusLock) {
|
||||||
|
Txn txn = db.startTransaction();
|
||||||
|
try {
|
||||||
|
db.removeContact(txn, c);
|
||||||
|
db.commitTransaction(txn);
|
||||||
|
} catch(DbException e) {
|
||||||
|
db.abortTransaction(txn);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void setRating(AuthorId a, Rating r) throws DbException {
|
public void setRating(AuthorId a, Rating r) throws DbException {
|
||||||
synchronized(messageLock) {
|
synchronized(messageLock) {
|
||||||
synchronized(ratingLock) {
|
synchronized(ratingLock) {
|
||||||
@@ -151,21 +399,6 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<GroupId> getSubscriptions() throws DbException {
|
|
||||||
synchronized(subscriptionLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
HashSet<GroupId> subs = new HashSet<GroupId>();
|
|
||||||
for(GroupId g : db.getSubscriptions(txn)) subs.add(g);
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
return subs;
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void subscribe(GroupId g) throws DbException {
|
public void subscribe(GroupId g) throws DbException {
|
||||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
|
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
|
||||||
synchronized(subscriptionLock) {
|
synchronized(subscriptionLock) {
|
||||||
@@ -199,237 +432,4 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void generateBundle(NeighbourId n, Bundle b) throws DbException {
|
|
||||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + n);
|
|
||||||
// Ack all batches received from the neighbour
|
|
||||||
synchronized(contactLock) {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
synchronized(messageStatusLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
int numAcks = 0;
|
|
||||||
for(BatchId ack : db.removeBatchesToAck(txn, n)) {
|
|
||||||
b.addAck(ack);
|
|
||||||
numAcks++;
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Added " + numAcks + " acks");
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Add a list of subscriptions
|
|
||||||
synchronized(contactLock) {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
synchronized(subscriptionLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
int numSubs = 0;
|
|
||||||
for(GroupId g : db.getSubscriptions(txn)) {
|
|
||||||
b.addSubscription(g);
|
|
||||||
numSubs++;
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Added " + numSubs + " subscriptions");
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Add as many messages as possible to the bundle
|
|
||||||
long capacity = b.getCapacity();
|
|
||||||
while(true) {
|
|
||||||
Batch batch = fillBatch(n, capacity);
|
|
||||||
if(batch == null) break; // No more messages to send
|
|
||||||
b.addBatch(batch);
|
|
||||||
capacity -= batch.getSize();
|
|
||||||
// If the batch is less than half full, stop trying - there may be
|
|
||||||
// more messages trickling in but we can't wait forever
|
|
||||||
if(batch.getSize() * 2 < Batch.CAPACITY) break;
|
|
||||||
}
|
|
||||||
b.seal();
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Bundle sent, " + b.getSize() + " bytes");
|
|
||||||
System.gc();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Batch fillBatch(NeighbourId n, long capacity) throws DbException {
|
|
||||||
synchronized(contactLock) {
|
|
||||||
if(!containsNeighbour(n)) return null;
|
|
||||||
synchronized(messageLock) {
|
|
||||||
synchronized(messageStatusLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
capacity = Math.min(capacity, Batch.CAPACITY);
|
|
||||||
Iterator<MessageId> it =
|
|
||||||
db.getSendableMessages(txn, n, capacity).iterator();
|
|
||||||
if(!it.hasNext()) {
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
return null; // No more messages to send
|
|
||||||
}
|
|
||||||
Batch b = batchProvider.get();
|
|
||||||
Set<MessageId> sent = new HashSet<MessageId>();
|
|
||||||
while(it.hasNext()) {
|
|
||||||
MessageId m = it.next();
|
|
||||||
b.addMessage(db.getMessage(txn, m));
|
|
||||||
sent.add(m);
|
|
||||||
}
|
|
||||||
b.seal();
|
|
||||||
// Record the contents of the batch
|
|
||||||
assert !sent.isEmpty();
|
|
||||||
db.addOutstandingBatch(txn, n, b.getId(), sent);
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
return b;
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void removeNeighbour(NeighbourId n) throws DbException {
|
|
||||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing neighbour " + n);
|
|
||||||
synchronized(contactLock) {
|
|
||||||
synchronized(messageStatusLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
db.removeNeighbour(txn, n);
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void receiveBundle(NeighbourId n, Bundle b) throws DbException {
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received bundle from " + n + ", "
|
|
||||||
+ b.getSize() + " bytes");
|
|
||||||
// Mark all messages in acked batches as seen
|
|
||||||
synchronized(contactLock) {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
synchronized(messageLock) {
|
|
||||||
synchronized(messageStatusLock) {
|
|
||||||
int acks = 0;
|
|
||||||
for(BatchId ack : b.getAcks()) {
|
|
||||||
acks++;
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
db.removeAckedBatch(txn, n, ack);
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received " + acks + " acks");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Update the neighbour's subscriptions
|
|
||||||
synchronized(contactLock) {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
synchronized(messageStatusLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
db.clearSubscriptions(txn, n);
|
|
||||||
int subs = 0;
|
|
||||||
for(GroupId g : b.getSubscriptions()) {
|
|
||||||
subs++;
|
|
||||||
db.addSubscription(txn, n, g);
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received " + subs + " subscriptions");
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Store the messages
|
|
||||||
int batches = 0;
|
|
||||||
for(Batch batch : b.getBatches()) {
|
|
||||||
batches++;
|
|
||||||
waitForPermissionToWrite();
|
|
||||||
synchronized(contactLock) {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
synchronized(messageLock) {
|
|
||||||
synchronized(messageStatusLock) {
|
|
||||||
synchronized(subscriptionLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
int received = 0, stored = 0;
|
|
||||||
for(Message m : batch.getMessages()) {
|
|
||||||
received++;
|
|
||||||
GroupId g = m.getGroup();
|
|
||||||
if(db.containsSubscription(txn, g)) {
|
|
||||||
if(storeMessage(txn, m, n)) stored++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received " + received
|
|
||||||
+ " messages, stored " + stored);
|
|
||||||
db.addBatchToAck(txn, n, batch.getId());
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Received " + batches + " batches");
|
|
||||||
// Find any lost batches that need to be retransmitted
|
|
||||||
Set<BatchId> lost;
|
|
||||||
synchronized(contactLock) {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
synchronized(messageLock) {
|
|
||||||
synchronized(messageStatusLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
lost = db.addReceivedBundle(txn, n, b.getId());
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for(BatchId batch : lost) {
|
|
||||||
synchronized(contactLock) {
|
|
||||||
if(!containsNeighbour(n)) return;
|
|
||||||
synchronized(messageLock) {
|
|
||||||
synchronized(messageStatusLock) {
|
|
||||||
Txn txn = db.startTransaction();
|
|
||||||
try {
|
|
||||||
if(LOG.isLoggable(Level.FINE))
|
|
||||||
LOG.fine("Removing lost batch");
|
|
||||||
db.removeLostBatch(txn, n, batch);
|
|
||||||
db.commitTransaction(txn);
|
|
||||||
} catch(DbException e) {
|
|
||||||
db.abortTransaction(txn);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
System.gc();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user