Added the ability to remove neighbours from the database (untested).

This commit is contained in:
akwizgran
2011-06-29 12:54:00 +01:00
parent 6960f64982
commit ed0174a91b
6 changed files with 689 additions and 425 deletions

View File

@@ -32,9 +32,11 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
* implementation can allow writers to starve.
*/
private final ReentrantReadWriteLock contactLock =
new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock messageLock =
new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock neighbourLock =
private final ReentrantReadWriteLock messageStatusLock =
new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock ratingLock =
new ReentrantReadWriteLock(true);
@@ -48,85 +50,16 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
protected void expireMessages(long size) throws DbException {
messageLock.writeLock().lock();
contactLock.readLock().lock();
try {
neighbourLock.writeLock().lock();
messageLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
for(MessageId m : db.getOldMessages(txn, size)) {
removeMessage(txn, m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
}
public void close() throws DbException {
messageLock.writeLock().lock();
try {
neighbourLock.writeLock().lock();
try {
ratingLock.writeLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
db.close();
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
ratingLock.writeLock().unlock();
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
}
public void addNeighbour(NeighbourId n) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n);
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
db.addNeighbour(txn, n);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
}
public void addLocallyGeneratedMessage(Message m) throws DbException {
waitForPermissionToWrite();
messageLock.writeLock().lock();
try {
neighbourLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
if(db.containsSubscription(txn, m.getGroup())) {
boolean added = storeMessage(txn, m, null);
assert added;
} else {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Not subscribed");
for(MessageId m : db.getOldMessages(txn, size)) {
removeMessage(txn, m);
}
db.commitTransaction(txn);
} catch(DbException e) {
@@ -134,13 +67,102 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
messageStatusLock.writeLock().unlock();
}
} finally {
neighbourLock.writeLock().unlock();
messageLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
contactLock.readLock().unlock();
}
}
public void close() throws DbException {
contactLock.writeLock().lock();
try {
messageLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
ratingLock.writeLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
db.close();
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
ratingLock.writeLock().unlock();
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
}
}
public void addNeighbour(NeighbourId n) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n);
contactLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
db.addNeighbour(txn, n);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
}
}
public void addLocallyGeneratedMessage(Message m) throws DbException {
waitForPermissionToWrite();
contactLock.readLock().lock();
try {
messageLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
if(db.containsSubscription(txn, m.getGroup())) {
boolean added = storeMessage(txn, m, null);
assert added;
} else {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Not subscribed");
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
@@ -161,6 +183,28 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public void removeNeighbour(NeighbourId n) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing neighbour " + n);
contactLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
db.removeNeighbour(txn, n);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
}
}
public void setRating(AuthorId a, Rating r) throws DbException {
messageLock.writeLock().lock();
try {
@@ -224,72 +268,89 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void unsubscribe(GroupId g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g);
messageLock.writeLock().lock();
contactLock.readLock().lock();
try {
neighbourLock.writeLock().lock();
messageLock.writeLock().lock();
try {
subscriptionLock.writeLock().lock();
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
subscriptionLock.writeLock().lock();
try {
db.removeSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
Txn txn = db.startTransaction();
try {
db.removeSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
subscriptionLock.writeLock().unlock();
messageStatusLock.writeLock().unlock();
}
} finally {
neighbourLock.writeLock().unlock();
messageLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
contactLock.readLock().unlock();
}
}
public void generateBundle(NeighbourId n, Bundle b) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + n);
// Ack all batches received from the neighbour
neighbourLock.writeLock().lock();
contactLock.readLock().lock();
try {
Txn txn = db.startTransaction();
if(!containsNeighbour(n)) return;
messageStatusLock.writeLock().lock();
try {
int numAcks = 0;
for(BatchId ack : db.removeBatchesToAck(txn, n)) {
b.addAck(ack);
numAcks++;
Txn txn = db.startTransaction();
try {
int numAcks = 0;
for(BatchId ack : db.removeBatchesToAck(txn, n)) {
b.addAck(ack);
numAcks++;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + numAcks + " acks");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + numAcks + " acks");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
neighbourLock.writeLock().unlock();
contactLock.readLock().unlock();
}
// Add a list of subscriptions
subscriptionLock.readLock().lock();
contactLock.readLock().lock();
try {
Txn txn = db.startTransaction();
if(!containsNeighbour(n)) return;
subscriptionLock.readLock().lock();
try {
int numSubs = 0;
for(GroupId g : db.getSubscriptions(txn)) {
b.addSubscription(g);
numSubs++;
Txn txn = db.startTransaction();
try {
int numSubs = 0;
for(GroupId g : db.getSubscriptions(txn)) {
b.addSubscription(g);
numSubs++;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + numSubs + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + numSubs + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
subscriptionLock.readLock().unlock();
contactLock.readLock().unlock();
}
// Add as many messages as possible to the bundle
long capacity = b.getCapacity();
@@ -309,55 +370,61 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
private Batch fillBatch(NeighbourId n, long capacity) throws DbException {
messageLock.readLock().lock();
contactLock.readLock().lock();
try {
Set<MessageId> sent;
Batch b;
neighbourLock.readLock().lock();
if(!containsNeighbour(n)) return null;
messageLock.readLock().lock();
try {
Txn txn = db.startTransaction();
Set<MessageId> sent;
Batch b;
messageStatusLock.readLock().lock();
try {
capacity = Math.min(capacity, Batch.CAPACITY);
Iterator<MessageId> it =
db.getSendableMessages(txn, n, capacity).iterator();
if(!it.hasNext()) {
Txn txn = db.startTransaction();
try {
capacity = Math.min(capacity, Batch.CAPACITY);
Iterator<MessageId> it =
db.getSendableMessages(txn, n, capacity).iterator();
if(!it.hasNext()) {
db.commitTransaction(txn);
return null; // No more messages to send
}
sent = new HashSet<MessageId>();
b = batchProvider.get();
while(it.hasNext()) {
MessageId m = it.next();
b.addMessage(db.getMessage(txn, m));
sent.add(m);
}
b.seal();
db.commitTransaction(txn);
return null; // No more messages to send
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
sent = new HashSet<MessageId>();
b = batchProvider.get();
while(it.hasNext()) {
MessageId m = it.next();
b.addMessage(db.getMessage(txn, m));
sent.add(m);
}
b.seal();
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} finally {
messageStatusLock.readLock().unlock();
}
} finally {
neighbourLock.readLock().unlock();
}
// Record the contents of the batch
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
// Record the contents of the batch
messageStatusLock.writeLock().lock();
try {
assert !sent.isEmpty();
db.addOutstandingBatch(txn, n, b.getId(), sent);
db.commitTransaction(txn);
return b;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
Txn txn = db.startTransaction();
try {
assert !sent.isEmpty();
db.addOutstandingBatch(txn, n, b.getId(), sent);
db.commitTransaction(txn);
return b;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
neighbourLock.writeLock().unlock();
messageLock.readLock().unlock();
}
} finally {
messageLock.readLock().unlock();
contactLock.readLock().unlock();
}
}
@@ -366,133 +433,164 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
LOG.fine("Received bundle from " + n + ", "
+ b.getSize() + " bytes");
// Mark all messages in acked batches as seen
messageLock.readLock().lock();
contactLock.readLock().lock();
try {
neighbourLock.writeLock().lock();
if(!containsNeighbour(n)) return;
messageLock.readLock().lock();
try {
int acks = 0;
for(BatchId ack : b.getAcks()) {
acks++;
Txn txn = db.startTransaction();
try {
db.removeAckedBatch(txn, n, ack);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + acks + " acks");
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
// Update the neighbour's subscriptions
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
db.clearSubscriptions(txn, n);
int subs = 0;
for(GroupId g : b.getSubscriptions()) {
subs++;
db.addSubscription(txn, n, g);
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + subs + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
// Store the messages
int batches = 0;
for(Batch batch : b.getBatches()) {
batches++;
waitForPermissionToWrite();
messageLock.writeLock().lock();
try {
neighbourLock.writeLock().lock();
messageStatusLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
try {
int acks = 0;
for(BatchId ack : b.getAcks()) {
acks++;
Txn txn = db.startTransaction();
try {
int received = 0, stored = 0;
for(Message m : batch.getMessages()) {
received++;
if(db.containsSubscription(txn, m.getGroup())) {
if(storeMessage(txn, m, n)) stored++;
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + received
+ " messages, stored " + stored);
db.addBatchToAck(txn, n, batch.getId());
db.removeAckedBatch(txn, n, ack);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + acks + " acks");
} finally {
neighbourLock.writeLock().unlock();
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
// Find any lost batches that need to be retransmitted
Set<BatchId> lost;
messageLock.readLock().lock();
// Update the neighbour's subscriptions
contactLock.readLock().lock();
try {
neighbourLock.writeLock().lock();
if(!containsNeighbour(n)) return;
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
lost = db.addReceivedBundle(txn, n, b.getId());
db.clearSubscriptions(txn, n);
int subs = 0;
for(GroupId g : b.getSubscriptions()) {
subs++;
db.addSubscription(txn, n, g);
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + subs + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
contactLock.readLock().lock();
}
for(BatchId batch : lost) {
// Store the messages
int batches = 0;
for(Batch batch : b.getBatches()) {
batches++;
waitForPermissionToWrite();
contactLock.readLock().lock();
try {
if(!containsNeighbour(n)) return;
messageLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
int received = 0, stored = 0;
for(Message m : batch.getMessages()) {
received++;
GroupId g = m.getGroup();
if(db.containsSubscription(txn, g)) {
if(storeMessage(txn, m, n)) stored++;
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + received
+ " messages, stored " + stored);
db.addBatchToAck(txn, n, batch.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
// Find any lost batches that need to be retransmitted
Set<BatchId> lost;
contactLock.readLock().lock();
try {
if(!containsNeighbour(n)) return;
messageLock.readLock().lock();
try {
neighbourLock.writeLock().lock();
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Removing lost batch");
db.removeLostBatch(txn, n, batch);
lost = db.addReceivedBundle(txn, n, b.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
for(BatchId batch : lost) {
contactLock.readLock().lock();
try {
if(!containsNeighbour(n)) return;
messageLock.readLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Removing lost batch");
db.removeLostBatch(txn, n, batch);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
System.gc();
}