package net.sf.briar.db; import java.io.IOException; import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; import net.sf.briar.api.ContactId; import net.sf.briar.api.Rating; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageId; import net.sf.briar.api.protocol.Offer; import net.sf.briar.api.protocol.Subscriptions; import net.sf.briar.api.protocol.Transports; import net.sf.briar.api.protocol.writers.AckWriter; import net.sf.briar.api.protocol.writers.BatchWriter; import net.sf.briar.api.protocol.writers.OfferWriter; import net.sf.briar.api.protocol.writers.RequestWriter; import net.sf.briar.api.protocol.writers.SubscriptionWriter; import net.sf.briar.api.protocol.writers.TransportWriter; import com.google.inject.Inject; /** * An implementation of DatabaseComponent using reentrant read-write locks. * This implementation can allow writers to starve. */ class ReadWriteLockDatabaseComponent extends DatabaseComponentImpl { private static final Logger LOG = Logger.getLogger(ReadWriteLockDatabaseComponent.class.getName()); /* * Locks must always be acquired in alphabetical order. See the Database * interface to find out which calls require which locks. */ private final ReentrantReadWriteLock contactLock = new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock messageLock = new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock messageStatusLock = new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock ratingLock = new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock subscriptionLock = new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock transportLock = new ReentrantReadWriteLock(true); @Inject ReadWriteLockDatabaseComponent(Database db, DatabaseCleaner cleaner) { super(db, cleaner); } protected void expireMessages(int size) throws DbException { contactLock.readLock().lock(); try { messageLock.writeLock().lock(); try { messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { for(MessageId m : db.getOldMessages(txn, size)) { removeMessage(txn, m); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.writeLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public void close() throws DbException { cleaner.stopCleaning(); contactLock.writeLock().lock(); try { messageLock.writeLock().lock(); try { messageStatusLock.writeLock().lock(); try { ratingLock.writeLock().lock(); try { subscriptionLock.writeLock().lock(); try { db.close(); } finally { subscriptionLock.writeLock().unlock(); } } finally { ratingLock.writeLock().unlock(); } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.writeLock().unlock(); } } finally { contactLock.writeLock().unlock(); } } public ContactId addContact(Map transports) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding contact"); contactLock.writeLock().lock(); try { transportLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { ContactId c = db.addContact(txn, transports); db.commitTransaction(txn); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added contact " + c); return c; } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { transportLock.writeLock().unlock(); } } finally { contactLock.writeLock().unlock(); } } public void addLocallyGeneratedMessage(Message m) throws DbException { waitForPermissionToWrite(); contactLock.readLock().lock(); try { messageLock.writeLock().lock(); try { messageStatusLock.writeLock().lock(); try { subscriptionLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { // Don't store the message if the user has // unsubscribed from the group if(db.containsSubscription(txn, m.getGroup())) { boolean added = storeMessage(txn, m, null); if(!added) { if(LOG.isLoggable(Level.FINE)) LOG.fine("Duplicate local message"); } } else { if(LOG.isLoggable(Level.FINE)) LOG.fine("Not subscribed"); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { subscriptionLock.readLock().unlock(); } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.writeLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public void findLostBatches(ContactId c) throws DbException { // Find any lost batches that need to be retransmitted Collection lost; contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { lost = db.getLostBatches(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } for(BatchId batch : lost) { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing lost batch"); db.removeLostBatch(txn, c, batch); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } } } public void generateAck(ContactId c, AckWriter a) throws DbException, IOException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { Collection acks = db.getBatchesToAck(txn, c); Collection sent = new ArrayList(); for(BatchId b : acks) if(a.writeBatchId(b)) sent.add(b); a.finish(); db.removeBatchesToAck(txn, c, sent); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + acks.size() + " acks"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } finally { messageStatusLock.writeLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public void generateBatch(ContactId c, BatchWriter b) throws DbException, IOException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { Collection sent; int bytesSent = 0; messageStatusLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { int capacity = b.getCapacity(); Iterator it = db.getSendableMessages(txn, c, capacity).iterator(); sent = new ArrayList(); while(it.hasNext()) { MessageId m = it.next(); byte[] message = db.getMessage(txn, m); if(!b.writeMessage(message)) break; bytesSent += message.length; sent.add(m); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } finally { messageStatusLock.readLock().unlock(); } BatchId id = b.finish(); // Record the contents of the batch, unless it's empty if(sent.isEmpty()) return; messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { db.addOutstandingBatch(txn, c, id, sent); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public Collection generateBatch(ContactId c, BatchWriter b, Collection requested) throws DbException, IOException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { Collection sent; messageStatusLock.readLock().lock(); try{ Txn txn = db.startTransaction(); try { sent = new ArrayList(); int bytesSent = 0; for(MessageId m : requested) { byte[] message = db.getMessageIfSendable(txn, c, m); if(message == null) continue; if(!b.writeMessage(message)) break; bytesSent += message.length; sent.add(m); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } finally { messageStatusLock.readLock().unlock(); } BatchId id = b.finish(); // Record the contents of the batch, unless it's empty if(sent.isEmpty()) return sent; messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { db.addOutstandingBatch(txn, c, id, sent); db.commitTransaction(txn); return sent; } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public Collection generateOffer(ContactId c, OfferWriter o) throws DbException, IOException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { messageStatusLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { Collection sendable = db.getSendableMessages(txn, c, Integer.MAX_VALUE); Iterator it = sendable.iterator(); Collection sent = new ArrayList(); while(it.hasNext()) { MessageId m = it.next(); if(!o.writeMessageId(m)) break; sent.add(m); } o.finish(); db.commitTransaction(txn); return sent; } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } finally { messageStatusLock.readLock().unlock(); } } finally { messageLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public void generateSubscriptions(ContactId c, SubscriptionWriter s) throws DbException, IOException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); subscriptionLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { Collection subs = db.getSubscriptions(txn); s.writeSubscriptions(subs); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + subs.size() + " subscriptions"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } finally { subscriptionLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public void generateTransports(ContactId c, TransportWriter t) throws DbException, IOException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); transportLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { Map transports = db.getTransports(txn); t.writeTransports(transports); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + transports.size() + " transports"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } finally { transportLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public Collection getContacts() throws DbException { contactLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { Collection contacts = db.getContacts(txn); db.commitTransaction(txn); return contacts; } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { contactLock.readLock().unlock(); } } public Rating getRating(AuthorId a) throws DbException { ratingLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { Rating r = db.getRating(txn, a); db.commitTransaction(txn); return r; } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { ratingLock.readLock().unlock(); } } public Collection getSubscriptions() throws DbException { subscriptionLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { Collection subs = db.getSubscriptions(txn); db.commitTransaction(txn); return subs; } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { subscriptionLock.readLock().unlock(); } } public Map getTransports() throws DbException { transportLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { Map transports = db.getTransports(txn); db.commitTransaction(txn); return transports; } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { transportLock.readLock().unlock(); } } public Map getTransports(ContactId c) throws DbException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); transportLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { Map transports = db.getTransports(txn, c); db.commitTransaction(txn); return transports; } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { transportLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public void receiveAck(ContactId c, Ack a) throws DbException { // Mark all messages in acked batches as seen contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { messageStatusLock.writeLock().lock(); try { Collection acks = a.getBatchIds(); for(BatchId ack : acks) { Txn txn = db.startTransaction(); try { db.removeAckedBatch(txn, c, ack); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + acks.size() + " acks"); } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public void receiveBatch(ContactId c, Batch b) throws DbException { waitForPermissionToWrite(); contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.writeLock().lock(); try { messageStatusLock.writeLock().lock(); try { subscriptionLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { int received = 0, stored = 0; for(Message m : b.getMessages()) { received++; GroupId g = m.getGroup(); if(db.containsSubscription(txn, g)) { if(storeMessage(txn, m, c)) stored++; } } if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + received + " messages, stored " + stored); db.addBatchToAck(txn, c, b.getId()); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { subscriptionLock.readLock().unlock(); } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.writeLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public void receiveOffer(ContactId c, Offer o, RequestWriter r) throws DbException, IOException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { messageStatusLock.writeLock().lock(); try { subscriptionLock.readLock().lock(); try { Collection offered = o.getMessageIds(); BitSet request = new BitSet(offered.size()); Txn txn = db.startTransaction(); try { Iterator it = offered.iterator(); for(int i = 0; it.hasNext(); i++) { // If the message is not in the database, or if // it is not visible to the contact, request it MessageId m = it.next(); if(!db.setStatusSeenIfVisible(txn, c, m)) request.set(i); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } r.writeBitmap(request, offered.size()); } finally { subscriptionLock.readLock().unlock(); } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } } public void receiveSubscriptions(ContactId c, Subscriptions s) throws DbException { // Update the contact's subscriptions contactLock.writeLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); subscriptionLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { Collection subs = s.getSubscriptions(); db.setSubscriptions(txn, c, subs, s.getTimestamp()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + subs.size() + " subscriptions"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { subscriptionLock.writeLock().unlock(); } } finally { contactLock.writeLock().unlock(); } } public void receiveTransports(ContactId c, Transports t) throws DbException { // Update the contact's transport details contactLock.writeLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); transportLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { Map transports = t.getTransports(); db.setTransports(txn, c, transports, t.getTimestamp()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + transports.size() + " transports"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { transportLock.writeLock().unlock(); } } finally { contactLock.writeLock().unlock(); } } public void removeContact(ContactId c) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing contact " + c); contactLock.writeLock().lock(); try { messageStatusLock.writeLock().lock(); try { subscriptionLock.writeLock().lock(); try { transportLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { db.removeContact(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { transportLock.writeLock().unlock(); } } finally { subscriptionLock.writeLock().unlock(); } } finally { messageStatusLock.writeLock().unlock(); } } finally { contactLock.writeLock().unlock(); } } public void setRating(AuthorId a, Rating r) throws DbException { messageLock.writeLock().lock(); try { ratingLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { Rating old = db.setRating(txn, a, r); // Update the sendability of the author's messages if(r == Rating.GOOD && old != Rating.GOOD) updateAuthorSendability(txn, a, true); else if(r != Rating.GOOD && old == Rating.GOOD) updateAuthorSendability(txn, a, false); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { ratingLock.writeLock().unlock(); } } finally { messageLock.writeLock().unlock(); } } public void subscribe(Group g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); subscriptionLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { db.addSubscription(txn, g); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { subscriptionLock.writeLock().unlock(); } } public void unsubscribe(GroupId g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g); contactLock.readLock().lock(); try { messageLock.writeLock().lock(); try { messageStatusLock.writeLock().lock(); try { subscriptionLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { db.removeSubscription(txn, g); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } finally { subscriptionLock.writeLock().unlock(); } } finally { messageStatusLock.writeLock().unlock(); } } finally { messageLock.writeLock().unlock(); } } finally { contactLock.readLock().unlock(); } } }