Initial commit with new directory structure.

This commit is contained in:
akwizgran
2011-06-21 18:01:28 +01:00
commit cd4f99df3d
98 changed files with 5811 additions and 0 deletions

View File

@@ -0,0 +1,114 @@
package net.sf.briar.db;
import java.util.Set;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NeighbourId;
import net.sf.briar.api.db.Rating;
import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.BundleId;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
interface Database<T> {
void open(boolean resume) throws DbException;
void close() throws DbException;
T startTransaction(String name) throws DbException;
void abortTransaction(T txn);
void commitTransaction(T txn) throws DbException;
// Locking: neighbours write
void addBatchToAck(T txn, NeighbourId n, BatchId b) throws DbException;
// Locking: neighbours write
void addNeighbour(T txn, NeighbourId n) throws DbException;
// Locking: neighbours write, messages read
void addOutstandingBatch(T txn, NeighbourId n, BatchId b, Set<MessageId> sent) throws DbException;
// Locking: neighbours write, messages read
Set<BatchId> addReceivedBundle(T txn, NeighbourId n, BundleId b) throws DbException;
// Locking: subscriptions write
void addSubscription(T txn, GroupId g) throws DbException;
// Locking: neighbours write
void addSubscription(T txn, NeighbourId n, GroupId g) throws DbException;
// Locking: neighbours write
void clearSubscriptions(T txn, NeighbourId n) throws DbException;
// Locking: messages read
boolean containsMessage(T txn, MessageId m) throws DbException;
// Locking: subscriptions read
boolean containsSubscription(T txn, GroupId g) throws DbException;
// Locking: messages read
long getFreeSpace() throws DbException;
// Locking: messages read
Message getMessage(T txn, MessageId m) throws DbException;
// Locking: messages read
Iterable<MessageId> getMessagesByAuthor(T txn, AuthorId a) throws DbException;
// Locking: messages read
Iterable<MessageId> getMessagesByParent(T txn, MessageId m) throws DbException;
// Locking: neighbours read
Set<NeighbourId> getNeighbours(T txn) throws DbException;
// Locking: messages read
Iterable<MessageId> getOldMessages(T txn, long size) throws DbException;
// Locking: messages read
MessageId getParent(T txn, MessageId m) throws DbException;
// Locking: ratings read
Rating getRating(T txn, AuthorId a) throws DbException;
// Locking: messages read
int getSendability(T txn, MessageId m) throws DbException;
// Locking: neighbours read, messages read
Iterable<MessageId> getSendableMessages(T txn, NeighbourId n, long capacity) throws DbException;
// Locking: subscriptions read
Set<GroupId> getSubscriptions(T txn) throws DbException;
// Locking: messages write
boolean addMessage(T txn, Message m) throws DbException;
// Locking: ratings write
Rating setRating(T txn, AuthorId a, Rating r) throws DbException;
// Locking: messages write
void setSendability(T txn, MessageId m, int sendability) throws DbException;
// Locking: neighbours read, n write
Set<BatchId> removeBatchesToAck(T txn, NeighbourId n) throws DbException;
// Locking: neighbours write, messages read
void removeLostBatch(T txn, NeighbourId n, BatchId b) throws DbException;
// Locking: neighbours write, messages write
void removeMessage(T txn, MessageId m) throws DbException;
// Locking: neighbours write
Set<MessageId> removeOutstandingBatch(T txn, NeighbourId n, BatchId b) throws DbException;
// Locking: subscriptions write, neighbours write, messages write
void removeSubscription(T txn, GroupId g) throws DbException;
// Locking: neighbours write, messages read
void setStatus(T txn, NeighbourId n, MessageId m, Status s) throws DbException;
}

View File

@@ -0,0 +1,209 @@
package net.sf.briar.db;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NeighbourId;
import net.sf.briar.api.db.Rating;
import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import com.google.inject.Provider;
abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent {
protected final Database<Txn> db;
protected final Provider<Batch> batchProvider;
private final Object spaceLock = new Object();
private final Object writeLock = new Object();
private long bytesStoredSinceLastCheck = 0L; // Locking: spaceLock
private long timeOfLastCheck = 0L; // Locking: spaceLock
private volatile boolean writesAllowed = true;
DatabaseComponentImpl(Database<Txn> db, Provider<Batch> batchProvider) {
this.db = db;
this.batchProvider = batchProvider;
startCleaner();
}
protected abstract void expireMessages(long size) throws DbException;
// Locking: messages write
private int calculateSendability(Txn txn, Message m) throws DbException {
int sendability = 0;
// One point for a good rating
if(getRating(m.getAuthor()) == Rating.GOOD) sendability++;
// One point per sendable child (backward inclusion)
for(MessageId kid : db.getMessagesByParent(txn, m.getId())) {
Integer kidSendability = db.getSendability(txn, kid);
assert kidSendability != null;
if(kidSendability > 0) sendability++;
}
return sendability;
}
private void checkFreeSpaceAndClean() throws DbException {
long freeSpace = db.getFreeSpace();
while(freeSpace < MIN_FREE_SPACE) {
// If disk space is critical, disable the storage of new messages
if(freeSpace < CRITICAL_FREE_SPACE) {
System.out.println("Critical cleanup");
writesAllowed = false;
} else {
System.out.println("Normal cleanup");
}
expireMessages(BYTES_PER_SWEEP);
Thread.yield();
freeSpace = db.getFreeSpace();
// If disk space is no longer critical, re-enable writes
if(freeSpace >= CRITICAL_FREE_SPACE && !writesAllowed) {
writesAllowed = true;
synchronized(writeLock) {
writeLock.notifyAll();
}
}
}
}
// Locking: messages write, neighbours write
protected void removeMessage(Txn txn, MessageId id) throws DbException {
Integer sendability = db.getSendability(txn, id);
assert sendability != null;
if(sendability > 0) updateAncestorSendability(txn, id, false);
db.removeMessage(txn, id);
}
private boolean shouldCheckFreeSpace() {
synchronized(spaceLock) {
long now = System.currentTimeMillis();
if(bytesStoredSinceLastCheck > MAX_BYTES_BETWEEN_SPACE_CHECKS) {
System.out.println(bytesStoredSinceLastCheck
+ " bytes stored since last check");
bytesStoredSinceLastCheck = 0L;
timeOfLastCheck = now;
return true;
}
if(now - timeOfLastCheck > MAX_MS_BETWEEN_SPACE_CHECKS) {
System.out.println((now - timeOfLastCheck)
+ " ms since last check");
bytesStoredSinceLastCheck = 0L;
timeOfLastCheck = now;
return true;
}
}
return false;
}
private void startCleaner() {
Runnable cleaner = new Runnable() {
public void run() {
try {
while(true) {
if(shouldCheckFreeSpace()) {
checkFreeSpaceAndClean();
} else {
try {
Thread.sleep(CLEANER_SLEEP_MS);
} catch(InterruptedException ignored) {}
}
}
} catch(Throwable t) {
// FIXME: Work out what to do here
t.printStackTrace();
System.exit(1);
}
}
};
new Thread(cleaner).start();
}
// Locking: messages write, neighbours write
protected boolean storeMessage(Txn txn, Message m, NeighbourId sender)
throws DbException {
boolean added = db.addMessage(txn, m);
// Mark the message as seen by the sender
MessageId id = m.getId();
if(sender != null) db.setStatus(txn, sender, id, Status.SEEN);
if(added) {
// Mark the message as unseen by other neighbours
for(NeighbourId n : db.getNeighbours(txn)) {
if(!n.equals(sender)) db.setStatus(txn, n, id, Status.NEW);
}
// Calculate and store the message's sendability
int sendability = calculateSendability(txn, m);
db.setSendability(txn, id, sendability);
if(sendability > 0) updateAncestorSendability(txn, id, true);
// Count the bytes stored
synchronized(spaceLock) {
bytesStoredSinceLastCheck += m.getSize();
}
}
return added;
}
// Locking: messages write
private int updateAncestorSendability(Txn txn, MessageId m,
boolean increment) throws DbException {
int affected = 0;
boolean changed = true;
while(changed) {
MessageId parent = db.getParent(txn, m);
if(parent.equals(MessageId.NONE)) break;
if(!db.containsMessage(txn, parent)) break;
Integer parentSendability = db.getSendability(txn, parent);
assert parentSendability != null;
if(increment) {
parentSendability++;
changed = parentSendability == 1;
if(changed) affected++;
} else {
assert parentSendability > 0;
parentSendability--;
changed = parentSendability == 0;
if(changed) affected++;
}
db.setSendability(txn, parent, parentSendability);
m = parent;
}
return affected;
}
// Locking: messages write
protected void updateAuthorSendability(Txn txn, AuthorId a,
boolean increment) throws DbException {
int direct = 0, indirect = 0;
for(MessageId id : db.getMessagesByAuthor(txn, a)) {
int sendability = db.getSendability(txn, id);
if(increment) {
db.setSendability(txn, id, sendability + 1);
if(sendability == 0) {
direct++;
indirect += updateAncestorSendability(txn, id, true);
}
} else {
assert sendability > 0;
db.setSendability(txn, id, sendability - 1);
if(sendability == 1) {
direct++;
indirect += updateAncestorSendability(txn, id, false);
}
}
}
System.out.println(direct + " messages affected directly, "
+ indirect + " indirectly");
}
protected void waitForPermissionToWrite() {
synchronized(writeLock) {
while(!writesAllowed) {
System.out.println("Waiting for permission to write");
try {
writeLock.wait();
} catch(InterruptedException ignored) {}
}
}
}
}

View File

@@ -0,0 +1,22 @@
package net.sf.briar.db;
import net.sf.briar.api.crypto.Password;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabasePassword;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
public class DatabaseModule extends AbstractModule {
@Override
protected void configure() {
bind(Database.class).to(H2Database.class);
bind(DatabaseComponent.class).to(ReadWriteLockDatabaseComponent.class).in(Singleton.class);
bind(Password.class).annotatedWith(DatabasePassword.class).toInstance(new Password() {
public char[] getPassword() {
return "fixme fixme".toCharArray();
}
});
}
}

View File

@@ -0,0 +1,70 @@
package net.sf.briar.db;
import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Properties;
import net.sf.briar.api.crypto.Password;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabasePassword;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.protocol.MessageFactory;
import net.sf.briar.util.FileUtils;
import com.google.inject.Inject;
class H2Database extends JdbcDatabase {
private final Password password;
private final File home;
private final String url;
@Inject
H2Database(MessageFactory messageFactory,
@DatabasePassword Password password) {
super(messageFactory, "BINARY(32)");
this.password = password;
home = new File(FileUtils.getBriarDirectory(), "Data/db/db");
url = "jdbc:h2:split:" + home.getPath()
+ ";CIPHER=AES;DB_CLOSE_ON_EXIT=false";
}
public void open(boolean resume) throws DbException {
super.open(resume, home.getParentFile(), "org.h2.Driver");
}
public void close() throws DbException {
System.out.println("Closing database");
try {
super.closeAllConnections();
} catch(SQLException e) {
throw new DbException(e);
}
}
public long getFreeSpace() throws DbException {
File dir = home.getParentFile();
long free = dir.getFreeSpace();
long used = getDiskSpace(dir);
long quota = DatabaseComponent.MAX_DB_SIZE - used;
long min = Math.min(free, quota);
System.out.println("Free space: " + min);
return min;
}
@Override
protected Connection createConnection() throws SQLException {
Properties props = new Properties();
props.setProperty("user", "b");
char[] passwordArray = password.getPassword();
props.put("password", passwordArray);
try {
return DriverManager.getConnection(url, props);
} finally {
Arrays.fill(passwordArray, (char) 0);
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,497 @@
package net.sf.briar.db;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NeighbourId;
import net.sf.briar.api.db.Rating;
import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Bundle;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import com.google.inject.Inject;
import com.google.inject.Provider;
class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
/*
* Locks must always be acquired in alphabetical order. See the Database
* interface to find out which calls require which locks. Note: this
* implementation can allow writers to starve.
*/
private final ReentrantReadWriteLock messageLock =
new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock neighbourLock =
new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock ratingLock =
new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock subscriptionLock =
new ReentrantReadWriteLock(true);
@Inject
ReadWriteLockDatabaseComponent(Database<Txn> db,
Provider<Batch> batchProvider) {
super(db, batchProvider);
}
protected void expireMessages(long size) throws DbException {
messageLock.writeLock().lock();
try {
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction("cleaner");
try {
for(MessageId m : db.getOldMessages(txn, size)) {
removeMessage(txn, m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
}
public void close() throws DbException {
messageLock.writeLock().lock();
try {
neighbourLock.writeLock().lock();
try {
ratingLock.writeLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
db.close();
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
ratingLock.writeLock().unlock();
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
}
public void addNeighbour(NeighbourId n) throws DbException {
System.out.println("Adding neighbour " + n);
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction("addNeighbour");
try {
db.addNeighbour(txn, n);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
}
public void addLocallyGeneratedMessage(Message m) throws DbException {
waitForPermissionToWrite();
messageLock.writeLock().lock();
try {
neighbourLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction("addLocallyGeneratedMessage");
try {
if(db.containsSubscription(txn, m.getGroup())) {
boolean added = storeMessage(txn, m, null);
assert added;
} else {
System.out.println("Not subscribed");
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
}
public Rating getRating(AuthorId a) throws DbException {
ratingLock.readLock().lock();
try {
Txn txn = db.startTransaction("getRating");
try {
Rating r = db.getRating(txn, a);
db.commitTransaction(txn);
return r;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
ratingLock.readLock().unlock();
}
}
public void setRating(AuthorId a, Rating r) throws DbException {
messageLock.writeLock().lock();
try {
ratingLock.writeLock().lock();
try {
Txn txn = db.startTransaction("setRating");
try {
Rating old = db.setRating(txn, a, r);
// Update the sendability of the author's messages
if(r == Rating.GOOD && old != Rating.GOOD)
updateAuthorSendability(txn, a, true);
else if(r != Rating.GOOD && old == Rating.GOOD)
updateAuthorSendability(txn, a, false);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
ratingLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
}
public Set<GroupId> getSubscriptions() throws DbException {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction("getSubscriptions");
try {
HashSet<GroupId> subs = new HashSet<GroupId>();
for(GroupId g : db.getSubscriptions(txn)) subs.add(g);
db.commitTransaction(txn);
return subs;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
}
public void subscribe(GroupId g) throws DbException {
System.out.println("Subscribing to " + g);
subscriptionLock.writeLock().lock();
try {
Txn txn = db.startTransaction("subscribe");
try {
db.addSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
}
}
public void unsubscribe(GroupId g) throws DbException {
System.out.println("Unsubscribing from " + g);
messageLock.writeLock().lock();
try {
neighbourLock.writeLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
Txn txn = db.startTransaction("unsubscribe");
try {
db.removeSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
}
public void generateBundle(NeighbourId n, Bundle b) throws DbException {
System.out.println("Generating bundle for " + n);
// Ack all batches received from the neighbour
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction("generateBundle:acks");
try {
int numAcks = 0;
for(BatchId ack : db.removeBatchesToAck(txn, n)) {
b.addAck(ack);
numAcks++;
}
System.out.println("Added " + numAcks + " acks");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
// Add a list of subscriptions
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction("generateBundle:subscriptions");
try {
int numSubs = 0;
for(GroupId g : db.getSubscriptions(txn)) {
b.addSubscription(g);
numSubs++;
}
System.out.println("Added " + numSubs + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
// Add as many messages as possible to the bundle
long capacity = b.getCapacity();
while(true) {
Batch batch = fillBatch(n, capacity);
if(batch == null) break; // No more messages to send
b.addBatch(batch);
capacity -= batch.getSize();
// If the batch is less than half full, stop trying - there may be
// more messages trickling in but we can't wait forever
if(batch.getSize() * 2 < Batch.CAPACITY) break;
}
b.seal();
System.out.println("Bundle sent, " + b.getSize() + " bytes");
System.gc();
}
private Batch fillBatch(NeighbourId n, long capacity) throws DbException {
messageLock.readLock().lock();
try {
Set<MessageId> sent;
Batch b;
neighbourLock.readLock().lock();
try {
Txn txn = db.startTransaction("fillBatch:read");
try {
capacity = Math.min(capacity, Batch.CAPACITY);
Iterator<MessageId> it =
db.getSendableMessages(txn, n, capacity).iterator();
if(!it.hasNext()) {
db.commitTransaction(txn);
return null; // No more messages to send
}
sent = new HashSet<MessageId>();
b = batchProvider.get();
while(it.hasNext()) {
MessageId m = it.next();
b.addMessage(db.getMessage(txn, m));
sent.add(m);
}
b.seal();
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.readLock().unlock();
}
// Record the contents of the batch
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction("fillBatch:write");
try {
assert !sent.isEmpty();
db.addOutstandingBatch(txn, n, b.getId(), sent);
db.commitTransaction(txn);
return b;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
}
public void receiveBundle(NeighbourId n, Bundle b) throws DbException {
System.out.println("Received bundle from " + n + ", "
+ b.getSize() + " bytes");
// Mark all messages in acked batches as seen
messageLock.readLock().lock();
try {
neighbourLock.writeLock().lock();
try {
int acks = 0, expired = 0;
for(BatchId ack : b.getAcks()) {
acks++;
Txn txn = db.startTransaction("receiveBundle:acks");
try {
Iterable<MessageId> batch =
db.removeOutstandingBatch(txn, n, ack);
// May be null if the batch was empty or has expired
if(batch == null) {
expired++;
} else {
for(MessageId m : batch) {
// Don't re-create statuses for expired messages
if(db.containsMessage(txn, m))
db.setStatus(txn, n, m, Status.SEEN);
}
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
System.out.println("Received " + acks + " acks, " + expired
+ " expired");
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
// Update the neighbour's subscriptions
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction("receiveBundle:subscriptions");
try {
db.clearSubscriptions(txn, n);
int subs = 0;
for(GroupId g : b.getSubscriptions()) {
subs++;
db.addSubscription(txn, n, g);
}
System.out.println("Received " + subs + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
// Store the messages
int batches = 0;
for(Batch batch : b.getBatches()) {
batches++;
waitForPermissionToWrite();
messageLock.writeLock().lock();
try {
neighbourLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction("receiveBundle:batch");
try {
int received = 0, stored = 0;
for(Message m : batch.getMessages()) {
received++;
if(db.containsSubscription(txn, m.getGroup())) {
if(storeMessage(txn, m, n)) stored++;
}
}
System.out.println("Received " + received
+ " messages, stored " + stored);
db.addBatchToAck(txn, n, batch.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
}
System.out.println("Received " + batches + " batches");
// Find any lost batches that need to be retransmitted
Set<BatchId> lost;
messageLock.readLock().lock();
try {
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction("receiveBundle:findLost");
try {
lost = db.addReceivedBundle(txn, n, b.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
for(BatchId batch : lost) {
messageLock.readLock().lock();
try {
neighbourLock.writeLock().lock();
try {
Txn txn = db.startTransaction("receiveBundle:removeLost");
try {
System.out.println("Removing lost batch");
db.removeLostBatch(txn, n, batch);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
neighbourLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
}
System.gc();
}
}

View File

@@ -0,0 +1,381 @@
package net.sf.briar.db;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import com.google.inject.Inject;
import com.google.inject.Provider;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NeighbourId;
import net.sf.briar.api.db.Rating;
import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Bundle;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
/*
* Locks must always be acquired in alphabetical order. See the Database
* interface to find out which calls require which locks.
*/
private final Object messageLock = new Object();
private final Object neighbourLock = new Object();
private final Object ratingLock = new Object();
private final Object subscriptionLock = new Object();
@Inject
SynchronizedDatabaseComponent(Database<Txn> db,
Provider<Batch> batchProvider) {
super(db, batchProvider);
}
protected void expireMessages(long size) throws DbException {
synchronized(messageLock) {
synchronized(neighbourLock) {
Txn txn = db.startTransaction("cleaner");
try {
for(MessageId m : db.getOldMessages(txn, size)) {
removeMessage(txn, m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
public void close() throws DbException {
synchronized(messageLock) {
synchronized(neighbourLock) {
synchronized(ratingLock) {
synchronized(subscriptionLock) {
db.close();
}
}
}
}
}
public void addNeighbour(NeighbourId n) throws DbException {
System.out.println("Adding neighbour " + n);
synchronized(neighbourLock) {
Txn txn = db.startTransaction("addNeighbour");
try {
db.addNeighbour(txn, n);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
public void addLocallyGeneratedMessage(Message m) throws DbException {
waitForPermissionToWrite();
synchronized(messageLock) {
synchronized(neighbourLock) {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction("addLocallyGeneratedMessage");
try {
if(db.containsSubscription(txn, m.getGroup())) {
boolean added = storeMessage(txn, m, null);
assert added;
} else {
System.out.println("Not subscribed");
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
public Rating getRating(AuthorId a) throws DbException {
synchronized(ratingLock) {
Txn txn = db.startTransaction("getRating");
try {
Rating r = db.getRating(txn, a);
db.commitTransaction(txn);
return r;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
public void setRating(AuthorId a, Rating r) throws DbException {
synchronized(messageLock) {
synchronized(ratingLock) {
Txn txn = db.startTransaction("setRating");
try {
Rating old = db.setRating(txn, a, r);
// Update the sendability of the author's messages
if(r == Rating.GOOD && old != Rating.GOOD)
updateAuthorSendability(txn, a, true);
else if(r != Rating.GOOD && old == Rating.GOOD)
updateAuthorSendability(txn, a, false);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
public Set<GroupId> getSubscriptions() throws DbException {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction("getSubscriptions");
try {
HashSet<GroupId> subs = new HashSet<GroupId>();
for(GroupId g : db.getSubscriptions(txn)) subs.add(g);
db.commitTransaction(txn);
return subs;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
public void subscribe(GroupId g) throws DbException {
System.out.println("Subscribing to " + g);
synchronized(subscriptionLock) {
Txn txn = db.startTransaction("subscribe");
try {
db.addSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
public void unsubscribe(GroupId g) throws DbException {
System.out.println("Unsubscribing from " + g);
synchronized(messageLock) {
synchronized(neighbourLock) {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction("unsubscribe");
try {
db.removeSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
public void generateBundle(NeighbourId n, Bundle b) throws DbException {
System.out.println("Generating bundle for " + n);
// Ack all batches received from the neighbour
synchronized(neighbourLock) {
Txn txn = db.startTransaction("generateBundle:acks");
try {
int numAcks = 0;
for(BatchId ack : db.removeBatchesToAck(txn, n)) {
b.addAck(ack);
numAcks++;
}
System.out.println("Added " + numAcks + " acks");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
// Add a list of subscriptions
synchronized(subscriptionLock) {
Txn txn = db.startTransaction("generateBundle:subscriptions");
try {
int numSubs = 0;
for(GroupId g : db.getSubscriptions(txn)) {
b.addSubscription(g);
numSubs++;
}
System.out.println("Added " + numSubs + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
// Add as many messages as possible to the bundle
long capacity = b.getCapacity();
while(true) {
Batch batch = fillBatch(n, capacity);
if(batch == null) break; // No more messages to send
b.addBatch(batch);
capacity -= batch.getSize();
// If the batch is less than half full, stop trying - there may be
// more messages trickling in but we can't wait forever
if(batch.getSize() * 2 < Batch.CAPACITY) break;
}
b.seal();
System.out.println("Bundle sent, " + b.getSize() + " bytes");
System.gc();
}
private Batch fillBatch(NeighbourId n, long capacity) throws DbException {
synchronized(messageLock) {
synchronized(neighbourLock) {
Txn txn = db.startTransaction("fillBatch");
try {
capacity = Math.min(capacity, Batch.CAPACITY);
Iterator<MessageId> it =
db.getSendableMessages(txn, n, capacity).iterator();
if(!it.hasNext()) {
db.commitTransaction(txn);
return null; // No more messages to send
}
Batch b = batchProvider.get();
Set<MessageId> sent = new HashSet<MessageId>();
while(it.hasNext()) {
MessageId m = it.next();
b.addMessage(db.getMessage(txn, m));
sent.add(m);
}
b.seal();
// Record the contents of the batch
assert !sent.isEmpty();
db.addOutstandingBatch(txn, n, b.getId(), sent);
db.commitTransaction(txn);
return b;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
public void receiveBundle(NeighbourId n, Bundle b) throws DbException {
System.out.println("Received bundle from " + n + ", "
+ b.getSize() + " bytes");
// Mark all messages in acked batches as seen
synchronized(messageLock) {
synchronized(neighbourLock) {
int acks = 0, expired = 0;
for(BatchId ack : b.getAcks()) {
acks++;
Txn txn = db.startTransaction("receiveBundle:acks");
try {
Iterable<MessageId> batch =
db.removeOutstandingBatch(txn, n, ack);
// May be null if the batch was empty or has expired
if(batch == null) {
expired++;
} else {
for(MessageId m : batch) {
// Don't re-create statuses for expired messages
if(db.containsMessage(txn, m))
db.setStatus(txn, n, m, Status.SEEN);
}
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
System.out.println("Received " + acks + " acks, " + expired
+ " expired");
}
}
// Update the neighbour's subscriptions
synchronized(neighbourLock) {
Txn txn = db.startTransaction("receiveBundle:subscriptions");
try {
db.clearSubscriptions(txn, n);
int subs = 0;
for(GroupId g : b.getSubscriptions()) {
subs++;
db.addSubscription(txn, n, g);
}
System.out.println("Received " + subs + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
// Store the messages
int batches = 0;
for(Batch batch : b.getBatches()) {
batches++;
waitForPermissionToWrite();
synchronized(messageLock) {
synchronized(neighbourLock) {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction("receiveBundle:batch");
try {
int received = 0, stored = 0;
for(Message m : batch.getMessages()) {
received++;
if(db.containsSubscription(txn, m.getGroup())) {
if(storeMessage(txn, m, n)) stored++;
}
}
System.out.println("Received " + received
+ " messages, stored " + stored);
db.addBatchToAck(txn, n, batch.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
System.out.println("Received " + batches + " batches");
// Find any lost batches that need to be retransmitted
Set<BatchId> lost;
synchronized(messageLock) {
synchronized(neighbourLock) {
Txn txn = db.startTransaction("receiveBundle:findLost");
try {
lost = db.addReceivedBundle(txn, n, b.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
for(BatchId batch : lost) {
synchronized(messageLock) {
synchronized(neighbourLock) {
Txn txn = db.startTransaction("receiveBundle:removeLost");
try {
System.out.println("Removing lost batch");
db.removeLostBatch(txn, n, batch);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
System.gc();
}
}