Don't store subscription or transport updates that are older than those already received. Also some small changes to DatabaseComponent impls for readability.

This commit is contained in:
akwizgran
2011-07-14 13:53:13 +01:00
parent 836d30f6df
commit fcedc34d10
10 changed files with 397 additions and 216 deletions

View File

@@ -146,68 +146,78 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void generateBundle(ContactId c, BundleWriter b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c);
Set<BatchId> acks;
Set<GroupId> subs;
Map<String, String> transports;
// Add acks
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
acks = db.removeBatchesToAck(txn, c);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + acks.size() + " acks");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
// Add subscriptions
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
subs = db.getSubscriptions(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
// Add transport details
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
transports = db.getTransports(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
Set<BatchId> acks = generateAcks(c);
Set<GroupId> subs = generateSubscriptions(c);
Map<String, String> transports = generateTransports(c);
// Add the header to the bundle
b.addHeader(acks, subs, transports);
// Add as many messages as possible to the bundle
while(fillBatch(c, b));
while(generateBatch(c, b));
b.finish();
if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated");
System.gc();
}
private boolean fillBatch(ContactId c, BundleWriter b) throws DbException,
IOException, GeneralSecurityException {
private Set<BatchId> generateAcks(ContactId c) throws DbException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
Set<BatchId> acks = db.removeBatchesToAck(txn, c);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + acks.size() + " acks");
db.commitTransaction(txn);
return acks;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
private Set<GroupId> generateSubscriptions(ContactId c) throws DbException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = db.getSubscriptions(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
return subs;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
private Map<String, String> generateTransports(ContactId c)
throws DbException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);
return transports;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
private boolean generateBatch(ContactId c, BundleWriter b)
throws DbException, IOException, GeneralSecurityException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
@@ -242,6 +252,9 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
} catch(SignatureException e) {
db.abortTransaction(txn);
throw e;
@@ -327,11 +340,28 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
public void receiveBundle(ContactId c, BundleReader b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + c);
Header h;
Header h = b.getHeader();
receiveAcks(c, h);
receiveSubscriptions(c, h);
receiveTransports(c, h);
// Store the messages
int batches = 0;
Batch batch = null;
while((batch = b.getNextBatch()) != null) {
receiveBatch(c, batch);
batches++;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
b.finish();
findLostBatches(c);
System.gc();
}
private void receiveAcks(ContactId c, Header h) throws DbException {
// Mark all messages in acked batches as seen
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
h = b.getHeader();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Set<BatchId> acks = h.getAcks();
@@ -350,6 +380,10 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
}
private void receiveSubscriptions(ContactId c, Header h)
throws DbException {
// Update the contact's subscriptions
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -357,7 +391,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = h.getSubscriptions();
db.setSubscriptions(txn, c, subs);
db.setSubscriptions(txn, c, subs, h.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + subs.size() + " subscriptions");
db.commitTransaction(txn);
@@ -367,6 +401,9 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
}
private void receiveTransports(ContactId c, Header h) throws DbException {
// Update the contact's transport details
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -374,7 +411,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = h.getTransports();
db.setTransports(txn, c, transports);
db.setTransports(txn, c, transports, h.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + transports.size()
+ " transports");
@@ -385,21 +422,9 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
// Store the messages
int batches = 0;
Batch batch = null;
while((batch = b.getNextBatch()) != null) {
storeBatch(c, batch);
batches++;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
b.finish();
findLostBatches(c);
System.gc();
}
private void storeBatch(ContactId c, Batch b) throws DbException {
private void receiveBatch(ContactId c, Batch b) throws DbException {
waitForPermissionToWrite();
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -511,23 +536,6 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public void setTransports(ContactId c, Map<String, String> transports)
throws DbException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
db.setTransports(txn, c, transports);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
public void subscribe(GroupId g) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g);
synchronized(subscriptionLock) {