mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-11 18:29:05 +01:00
Removed name from startTransaction(), merged the two batch-removal methods.
This commit is contained in:
@@ -26,15 +26,15 @@ public interface DatabaseComponent {
|
||||
static final int CLEANER_SLEEP_MS = 1000; // 1 sec
|
||||
static final int RETRANSMIT_THRESHOLD = 3;
|
||||
|
||||
/** Waits for any open transactions to finish and closes the database. */
|
||||
void close() throws DbException;
|
||||
|
||||
/** Adds a locally generated message to the database. */
|
||||
void addLocallyGeneratedMessage(Message m) throws DbException;
|
||||
|
||||
/** Adds a new neighbour to the database. */
|
||||
void addNeighbour(NeighbourId n) throws DbException;
|
||||
|
||||
/** Waits for any open transactions to finish and closes the database. */
|
||||
void close() throws DbException;
|
||||
|
||||
/** Generates a bundle of messages for the given neighbour. */
|
||||
void generateBundle(NeighbourId n, Bundle b) throws DbException;
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ interface Database<T> {
|
||||
void close() throws DbException;
|
||||
|
||||
/** Starts a new transaction and returns an object representing it. */
|
||||
T startTransaction(String name) throws DbException;
|
||||
T startTransaction() throws DbException;
|
||||
|
||||
/**
|
||||
* Aborts the given transaction - no changes made during the transaction
|
||||
@@ -183,6 +183,14 @@ interface Database<T> {
|
||||
*/
|
||||
Set<GroupId> getSubscriptions(T txn) throws DbException;
|
||||
|
||||
/**
|
||||
* Removes an outstanding batch that has been acknowledged. Any messages in
|
||||
* the batch that are still considered outstanding (Status.SENT) with
|
||||
* respect to the given neighbour are now considered seen (Status.SEEN).
|
||||
* Locking: neighbours write, messages read.
|
||||
*/
|
||||
void removeAckedBatch(T txn, NeighbourId n, BatchId b) throws DbException;
|
||||
|
||||
/**
|
||||
* Removes and returns the IDs of any batches received from the given
|
||||
* neighbour that need to be acknowledged.
|
||||
@@ -191,10 +199,9 @@ interface Database<T> {
|
||||
Set<BatchId> removeBatchesToAck(T txn, NeighbourId n) throws DbException;
|
||||
|
||||
/**
|
||||
* Removes an outstanding batch that is considered to have been lost. Any
|
||||
* messages in the batch that are still considered outstanding
|
||||
* (Status.SENT) with respect to the given neighbour are now considered
|
||||
* unsent (Status.NEW).
|
||||
* Removes an outstanding batch that has been lost. Any messages in the
|
||||
* batch that are still considered outstanding (Status.SENT) with respect
|
||||
* to the given neighbour are now considered unsent (Status.NEW).
|
||||
* Locking: neighbours write, messages read.
|
||||
*/
|
||||
void removeLostBatch(T txn, NeighbourId n, BatchId b) throws DbException;
|
||||
@@ -205,13 +212,6 @@ interface Database<T> {
|
||||
*/
|
||||
void removeMessage(T txn, MessageId m) throws DbException;
|
||||
|
||||
/**
|
||||
* Removes an outstanding batch that has been acknowledged. The status of
|
||||
* the acknowledged messages is not updated.
|
||||
* Locking: neighbours write.
|
||||
*/
|
||||
Set<MessageId> removeOutstandingBatch(T txn, NeighbourId n, BatchId b) throws DbException;
|
||||
|
||||
/**
|
||||
* Unsubscribes from the given group. Any messages belonging to the group
|
||||
* are deleted from the database.
|
||||
|
||||
@@ -177,7 +177,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
} catch(ClassNotFoundException e) {
|
||||
throw new DbException(e);
|
||||
}
|
||||
Connection txn = startTransaction("initialize");
|
||||
Connection txn = startTransaction();
|
||||
try {
|
||||
// If not resuming, create the tables
|
||||
if(resume) {
|
||||
@@ -264,7 +264,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
} catch(SQLException ignored) {}
|
||||
}
|
||||
|
||||
public Connection startTransaction(String name) throws DbException {
|
||||
public Connection startTransaction() throws DbException {
|
||||
Connection txn = null;
|
||||
try {
|
||||
synchronized(connections) {
|
||||
@@ -968,6 +968,58 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public void removeAckedBatch(Connection txn, NeighbourId n, BatchId b)
|
||||
throws DbException {
|
||||
removeBatch(txn, n, b, Status.SEEN);
|
||||
}
|
||||
|
||||
private void removeBatch(Connection txn, NeighbourId n, BatchId b,
|
||||
Status newStatus) throws DbException {
|
||||
PreparedStatement ps = null, ps1 = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT messageId FROM outstandingMessages"
|
||||
+ " WHERE neighbourId = ? AND batchId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, n.getInt());
|
||||
ps.setBytes(2, b.getBytes());
|
||||
rs = ps.executeQuery();
|
||||
sql = "UPDATE statuses SET status = ?"
|
||||
+ " WHERE messageId = ? AND neighbourId = ? AND status = ?";
|
||||
ps1 = txn.prepareStatement(sql);
|
||||
ps1.setShort(1, (short) newStatus.ordinal());
|
||||
ps1.setInt(3, n.getInt());
|
||||
ps1.setShort(4, (short) Status.SENT.ordinal());
|
||||
int messages = 0;
|
||||
while(rs.next()) {
|
||||
messages++;
|
||||
ps1.setBytes(2, rs.getBytes(1));
|
||||
ps1.addBatch();
|
||||
}
|
||||
rs.close();
|
||||
ps.close();
|
||||
int[] rowsAffected = ps1.executeBatch();
|
||||
assert rowsAffected.length == messages;
|
||||
for(int i = 0; i < rowsAffected.length; i++) {
|
||||
assert rowsAffected[i] <= 1;
|
||||
}
|
||||
ps1.close();
|
||||
// Cascade on delete
|
||||
sql = "DELETE FROM outstandingBatches WHERE batchId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setBytes(1, b.getBytes());
|
||||
int rowsAffected1 = ps.executeUpdate();
|
||||
assert rowsAffected1 <= 1;
|
||||
ps.close();
|
||||
} catch(SQLException e) {
|
||||
tryToClose(rs);
|
||||
tryToClose(ps);
|
||||
tryToClose(ps1);
|
||||
tryToClose(txn);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Set<BatchId> removeBatchesToAck(Connection txn, NeighbourId n)
|
||||
throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
@@ -999,48 +1051,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
|
||||
public void removeLostBatch(Connection txn, NeighbourId n, BatchId b)
|
||||
throws DbException {
|
||||
PreparedStatement ps = null, ps1 = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT messageId FROM outstandingMessages"
|
||||
+ " WHERE neighbourId = ? AND batchId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, n.getInt());
|
||||
ps.setBytes(2, b.getBytes());
|
||||
rs = ps.executeQuery();
|
||||
sql = "UPDATE statuses SET status = ?"
|
||||
+ " WHERE messageId = ? AND neighbourId = ? AND status = ?";
|
||||
ps1 = txn.prepareStatement(sql);
|
||||
ps1.setShort(1, (short) Status.NEW.ordinal());
|
||||
ps1.setInt(3, n.getInt());
|
||||
ps1.setShort(4, (short) Status.SENT.ordinal());
|
||||
int messages = 0;
|
||||
while(rs.next()) {
|
||||
messages++;
|
||||
ps1.setBytes(2, rs.getBytes(1));
|
||||
ps1.addBatch();
|
||||
}
|
||||
rs.close();
|
||||
ps.close();
|
||||
int[] rowsAffected = ps1.executeBatch();
|
||||
assert rowsAffected.length == messages;
|
||||
for(int i = 0; i < rowsAffected.length; i++) {
|
||||
assert rowsAffected[i] <= 1;
|
||||
}
|
||||
ps1.close();
|
||||
sql = "DELETE FROM outstandingBatches WHERE batchId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setBytes(1, b.getBytes());
|
||||
int rowsAffected1 = ps.executeUpdate();
|
||||
assert rowsAffected1 <= 1;
|
||||
ps.close();
|
||||
} catch(SQLException e) {
|
||||
tryToClose(rs);
|
||||
tryToClose(ps);
|
||||
tryToClose(ps1);
|
||||
tryToClose(txn);
|
||||
throw new DbException(e);
|
||||
}
|
||||
removeBatch(txn, n, b, Status.NEW);
|
||||
}
|
||||
|
||||
public void removeMessage(Connection txn, MessageId m) throws DbException {
|
||||
@@ -1059,36 +1070,6 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public Set<MessageId> removeOutstandingBatch(Connection txn, NeighbourId n,
|
||||
BatchId b) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT messageId FROM outstandingMessages"
|
||||
+ " WHERE neighbourId = ? AND batchId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, n.getInt());
|
||||
ps.setBytes(2, b.getBytes());
|
||||
rs = ps.executeQuery();
|
||||
Set<MessageId> messages = new HashSet<MessageId>();
|
||||
while(rs.next()) messages.add(new MessageId(rs.getBytes(1)));
|
||||
rs.close();
|
||||
ps.close();
|
||||
sql = "DELETE FROM outstandingBatches WHERE batchId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setBytes(1, b.getBytes());
|
||||
int rowsAffected = ps.executeUpdate();
|
||||
assert rowsAffected <= 1;
|
||||
ps.close();
|
||||
return messages.isEmpty() ? null : messages;
|
||||
} catch(SQLException e) {
|
||||
tryToClose(rs);
|
||||
tryToClose(ps);
|
||||
tryToClose(txn);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeSubscription(Connection txn, GroupId g)
|
||||
throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
|
||||
@@ -10,7 +10,6 @@ import java.util.logging.Logger;
|
||||
import net.sf.briar.api.db.DbException;
|
||||
import net.sf.briar.api.db.NeighbourId;
|
||||
import net.sf.briar.api.db.Rating;
|
||||
import net.sf.briar.api.db.Status;
|
||||
import net.sf.briar.api.protocol.AuthorId;
|
||||
import net.sf.briar.api.protocol.Batch;
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
@@ -53,7 +52,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
try {
|
||||
neighbourLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("cleaner");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
for(MessageId m : db.getOldMessages(txn, size)) {
|
||||
removeMessage(txn, m);
|
||||
@@ -99,7 +98,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n);
|
||||
neighbourLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("addNeighbour");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.addNeighbour(txn, n);
|
||||
db.commitTransaction(txn);
|
||||
@@ -120,7 +119,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
try {
|
||||
subscriptionLock.readLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("addLocallyGeneratedMessage");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
if(db.containsSubscription(txn, m.getGroup())) {
|
||||
boolean added = storeMessage(txn, m, null);
|
||||
@@ -148,7 +147,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
public Rating getRating(AuthorId a) throws DbException {
|
||||
ratingLock.readLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("getRating");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
Rating r = db.getRating(txn, a);
|
||||
db.commitTransaction(txn);
|
||||
@@ -167,7 +166,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
try {
|
||||
ratingLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("setRating");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
Rating old = db.setRating(txn, a, r);
|
||||
// Update the sendability of the author's messages
|
||||
@@ -191,7 +190,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
public Set<GroupId> getSubscriptions() throws DbException {
|
||||
subscriptionLock.readLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("getSubscriptions");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
HashSet<GroupId> subs = new HashSet<GroupId>();
|
||||
for(GroupId g : db.getSubscriptions(txn)) subs.add(g);
|
||||
@@ -210,7 +209,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
|
||||
subscriptionLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("subscribe");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.addSubscription(txn, g);
|
||||
db.commitTransaction(txn);
|
||||
@@ -231,7 +230,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
try {
|
||||
subscriptionLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("unsubscribe");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.removeSubscription(txn, g);
|
||||
db.commitTransaction(txn);
|
||||
@@ -255,7 +254,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
// Ack all batches received from the neighbour
|
||||
neighbourLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("generateBundle:acks");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
int numAcks = 0;
|
||||
for(BatchId ack : db.removeBatchesToAck(txn, n)) {
|
||||
@@ -275,7 +274,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
// Add a list of subscriptions
|
||||
subscriptionLock.readLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("generateBundle:subscriptions");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
int numSubs = 0;
|
||||
for(GroupId g : db.getSubscriptions(txn)) {
|
||||
@@ -316,7 +315,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
Batch b;
|
||||
neighbourLock.readLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("fillBatch:read");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
capacity = Math.min(capacity, Batch.CAPACITY);
|
||||
Iterator<MessageId> it =
|
||||
@@ -344,7 +343,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
// Record the contents of the batch
|
||||
neighbourLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("fillBatch:write");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
assert !sent.isEmpty();
|
||||
db.addOutstandingBatch(txn, n, b.getId(), sent);
|
||||
@@ -371,23 +370,12 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
try {
|
||||
neighbourLock.writeLock().lock();
|
||||
try {
|
||||
int acks = 0, expired = 0;
|
||||
int acks = 0;
|
||||
for(BatchId ack : b.getAcks()) {
|
||||
acks++;
|
||||
Txn txn = db.startTransaction("receiveBundle:acks");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
Iterable<MessageId> batch =
|
||||
db.removeOutstandingBatch(txn, n, ack);
|
||||
// May be null if the batch was empty or has expired
|
||||
if(batch == null) {
|
||||
expired++;
|
||||
} else {
|
||||
for(MessageId m : batch) {
|
||||
// Don't re-create statuses for expired messages
|
||||
if(db.containsMessage(txn, m))
|
||||
db.setStatus(txn, n, m, Status.SEEN);
|
||||
}
|
||||
}
|
||||
db.removeAckedBatch(txn, n, ack);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -395,8 +383,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
}
|
||||
}
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Received " + acks + " acks, " + expired
|
||||
+ " expired");
|
||||
LOG.fine("Received " + acks + " acks");
|
||||
} finally {
|
||||
neighbourLock.writeLock().unlock();
|
||||
}
|
||||
@@ -406,7 +393,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
// Update the neighbour's subscriptions
|
||||
neighbourLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("receiveBundle:subscriptions");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.clearSubscriptions(txn, n);
|
||||
int subs = 0;
|
||||
@@ -435,7 +422,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
try {
|
||||
subscriptionLock.readLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("receiveBundle:batch");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
int received = 0, stored = 0;
|
||||
for(Message m : batch.getMessages()) {
|
||||
@@ -471,7 +458,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
try {
|
||||
neighbourLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("receiveBundle:findLost");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
lost = db.addReceivedBundle(txn, n, b.getId());
|
||||
db.commitTransaction(txn);
|
||||
@@ -490,7 +477,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
try {
|
||||
neighbourLock.writeLock().lock();
|
||||
try {
|
||||
Txn txn = db.startTransaction("receiveBundle:removeLost");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Removing lost batch");
|
||||
|
||||
@@ -9,7 +9,6 @@ import java.util.logging.Logger;
|
||||
import net.sf.briar.api.db.DbException;
|
||||
import net.sf.briar.api.db.NeighbourId;
|
||||
import net.sf.briar.api.db.Rating;
|
||||
import net.sf.briar.api.db.Status;
|
||||
import net.sf.briar.api.protocol.AuthorId;
|
||||
import net.sf.briar.api.protocol.Batch;
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
@@ -45,7 +44,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
protected void expireMessages(long size) throws DbException {
|
||||
synchronized(messageLock) {
|
||||
synchronized(neighbourLock) {
|
||||
Txn txn = db.startTransaction("cleaner");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
for(MessageId m : db.getOldMessages(txn, size)) {
|
||||
removeMessage(txn, m);
|
||||
@@ -74,7 +73,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
public void addNeighbour(NeighbourId n) throws DbException {
|
||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n);
|
||||
synchronized(neighbourLock) {
|
||||
Txn txn = db.startTransaction("addNeighbour");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.addNeighbour(txn, n);
|
||||
db.commitTransaction(txn);
|
||||
@@ -90,7 +89,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
synchronized(messageLock) {
|
||||
synchronized(neighbourLock) {
|
||||
synchronized(subscriptionLock) {
|
||||
Txn txn = db.startTransaction("addLocallyGeneratedMessage");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
if(db.containsSubscription(txn, m.getGroup())) {
|
||||
boolean added = storeMessage(txn, m, null);
|
||||
@@ -111,7 +110,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
|
||||
public Rating getRating(AuthorId a) throws DbException {
|
||||
synchronized(ratingLock) {
|
||||
Txn txn = db.startTransaction("getRating");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
Rating r = db.getRating(txn, a);
|
||||
db.commitTransaction(txn);
|
||||
@@ -126,7 +125,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
public void setRating(AuthorId a, Rating r) throws DbException {
|
||||
synchronized(messageLock) {
|
||||
synchronized(ratingLock) {
|
||||
Txn txn = db.startTransaction("setRating");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
Rating old = db.setRating(txn, a, r);
|
||||
// Update the sendability of the author's messages
|
||||
@@ -145,7 +144,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
|
||||
public Set<GroupId> getSubscriptions() throws DbException {
|
||||
synchronized(subscriptionLock) {
|
||||
Txn txn = db.startTransaction("getSubscriptions");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
HashSet<GroupId> subs = new HashSet<GroupId>();
|
||||
for(GroupId g : db.getSubscriptions(txn)) subs.add(g);
|
||||
@@ -161,7 +160,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
public void subscribe(GroupId g) throws DbException {
|
||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
|
||||
synchronized(subscriptionLock) {
|
||||
Txn txn = db.startTransaction("subscribe");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.addSubscription(txn, g);
|
||||
db.commitTransaction(txn);
|
||||
@@ -177,7 +176,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
synchronized(messageLock) {
|
||||
synchronized(neighbourLock) {
|
||||
synchronized(subscriptionLock) {
|
||||
Txn txn = db.startTransaction("unsubscribe");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.removeSubscription(txn, g);
|
||||
db.commitTransaction(txn);
|
||||
@@ -194,7 +193,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + n);
|
||||
// Ack all batches received from the neighbour
|
||||
synchronized(neighbourLock) {
|
||||
Txn txn = db.startTransaction("generateBundle:acks");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
int numAcks = 0;
|
||||
for(BatchId ack : db.removeBatchesToAck(txn, n)) {
|
||||
@@ -211,7 +210,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
}
|
||||
// Add a list of subscriptions
|
||||
synchronized(subscriptionLock) {
|
||||
Txn txn = db.startTransaction("generateBundle:subscriptions");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
int numSubs = 0;
|
||||
for(GroupId g : db.getSubscriptions(txn)) {
|
||||
@@ -246,7 +245,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
private Batch fillBatch(NeighbourId n, long capacity) throws DbException {
|
||||
synchronized(messageLock) {
|
||||
synchronized(neighbourLock) {
|
||||
Txn txn = db.startTransaction("fillBatch");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
capacity = Math.min(capacity, Batch.CAPACITY);
|
||||
Iterator<MessageId> it =
|
||||
@@ -283,23 +282,12 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
// Mark all messages in acked batches as seen
|
||||
synchronized(messageLock) {
|
||||
synchronized(neighbourLock) {
|
||||
int acks = 0, expired = 0;
|
||||
int acks = 0;
|
||||
for(BatchId ack : b.getAcks()) {
|
||||
acks++;
|
||||
Txn txn = db.startTransaction("receiveBundle:acks");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
Iterable<MessageId> batch =
|
||||
db.removeOutstandingBatch(txn, n, ack);
|
||||
// May be null if the batch was empty or has expired
|
||||
if(batch == null) {
|
||||
expired++;
|
||||
} else {
|
||||
for(MessageId m : batch) {
|
||||
// Don't re-create statuses for expired messages
|
||||
if(db.containsMessage(txn, m))
|
||||
db.setStatus(txn, n, m, Status.SEEN);
|
||||
}
|
||||
}
|
||||
db.removeAckedBatch(txn, n, ack);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -307,13 +295,12 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
}
|
||||
}
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Received " + acks + " acks, " + expired
|
||||
+ " expired");
|
||||
LOG.fine("Received " + acks + " acks");
|
||||
}
|
||||
}
|
||||
// Update the neighbour's subscriptions
|
||||
synchronized(neighbourLock) {
|
||||
Txn txn = db.startTransaction("receiveBundle:subscriptions");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
db.clearSubscriptions(txn, n);
|
||||
int subs = 0;
|
||||
@@ -337,7 +324,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
synchronized(messageLock) {
|
||||
synchronized(neighbourLock) {
|
||||
synchronized(subscriptionLock) {
|
||||
Txn txn = db.startTransaction("receiveBundle:batch");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
int received = 0, stored = 0;
|
||||
for(Message m : batch.getMessages()) {
|
||||
@@ -365,7 +352,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
Set<BatchId> lost;
|
||||
synchronized(messageLock) {
|
||||
synchronized(neighbourLock) {
|
||||
Txn txn = db.startTransaction("receiveBundle:findLost");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
lost = db.addReceivedBundle(txn, n, b.getId());
|
||||
db.commitTransaction(txn);
|
||||
@@ -378,7 +365,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
|
||||
for(BatchId batch : lost) {
|
||||
synchronized(messageLock) {
|
||||
synchronized(neighbourLock) {
|
||||
Txn txn = db.startTransaction("receiveBundle:removeLost");
|
||||
Txn txn = db.startTransaction();
|
||||
try {
|
||||
if(LOG.isLoggable(Level.FINE))
|
||||
LOG.fine("Removing lost batch");
|
||||
|
||||
Reference in New Issue
Block a user