package net.sf.briar.db; import java.io.IOException; import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import net.sf.briar.api.ContactId; import net.sf.briar.api.Rating; import net.sf.briar.api.db.DatabaseListener.Event; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageId; import net.sf.briar.api.protocol.Offer; import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.writers.AckWriter; import net.sf.briar.api.protocol.writers.BatchWriter; import net.sf.briar.api.protocol.writers.OfferWriter; import net.sf.briar.api.protocol.writers.RequestWriter; import net.sf.briar.api.protocol.writers.SubscriptionWriter; import net.sf.briar.api.protocol.writers.TransportWriter; import net.sf.briar.api.transport.ConnectionWindow; import com.google.inject.Inject; /** * An implementation of DatabaseComponent using Java synchronization. This * implementation does not distinguish between readers and writers. */ class SynchronizedDatabaseComponent extends DatabaseComponentImpl { private static final Logger LOG = Logger.getLogger(SynchronizedDatabaseComponent.class.getName()); /* * Locks must always be acquired in alphabetical order. See the Database * interface to find out which calls require which locks. */ private final Object contactLock = new Object(); private final Object messageLock = new Object(); private final Object messageStatusLock = new Object(); private final Object ratingLock = new Object(); private final Object subscriptionLock = new Object(); private final Object transportLock = new Object(); private final Object windowLock = new Object(); @Inject SynchronizedDatabaseComponent(Database db, DatabaseCleaner cleaner) { super(db, cleaner); } protected void expireMessages(int size) throws DbException { synchronized(contactLock) { synchronized(messageLock) { synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { for(MessageId m : db.getOldMessages(txn, size)) { removeMessage(txn, m); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } } public void close() throws DbException { cleaner.stopCleaning(); db.close(); } public ContactId addContact(Map> transports, byte[] secret) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding contact"); ContactId c; synchronized(contactLock) { synchronized(transportLock) { Txn txn = db.startTransaction(); try { c = db.addContact(txn, transports, secret); db.commitTransaction(txn); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added contact " + c); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } // Call the listeners outside the lock callListeners(Event.CONTACTS_UPDATED); return c; } public void addLocalGroupMessage(Message m) throws DbException { boolean added = false; waitForPermissionToWrite(); synchronized(contactLock) { synchronized(messageLock) { synchronized(messageStatusLock) { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { // Don't store the message if the user has // unsubscribed from the group or the message // predates the subscription if(db.containsSubscription(txn, m.getGroup(), m.getTimestamp())) { added = storeGroupMessage(txn, m, null); if(!added) { if(LOG.isLoggable(Level.FINE)) LOG.fine("Duplicate local message"); } } else { if(LOG.isLoggable(Level.FINE)) LOG.fine("Not subscribed"); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } } // Call the listeners outside the lock if(added) callListeners(Event.MESSAGES_ADDED); } public void addLocalPrivateMessage(Message m, ContactId c) throws DbException { boolean added = false; waitForPermissionToWrite(); synchronized(contactLock) { synchronized(messageLock) { synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { added = storePrivateMessage(txn, m, c, false); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } // Call the listeners outside the lock if(added) callListeners(Event.MESSAGES_ADDED); } public void findLostBatches(ContactId c) throws DbException { // Find any lost batches that need to be retransmitted Collection lost; synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { lost = db.getLostBatches(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } for(BatchId batch : lost) { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing lost batch"); db.removeLostBatch(txn, c, batch); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } } } public void generateAck(ContactId c, AckWriter a) throws DbException, IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { Collection acks = db.getBatchesToAck(txn, c); Collection sent = new ArrayList(); for(BatchId b : acks) if(a.writeBatchId(b)) sent.add(b); a.finish(); db.removeBatchesToAck(txn, c, sent); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + acks.size() + " acks"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } } } public void generateBatch(ContactId c, BatchWriter b) throws DbException, IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { Collection sent = new ArrayList(); int bytesSent = 0; int capacity = b.getCapacity(); Collection sendable = db.getSendableMessages(txn, c, capacity); for(MessageId m : sendable) { byte[] raw = db.getMessage(txn, m); if(!b.writeMessage(raw)) break; bytesSent += raw.length; sent.add(m); } // If the batch is not empty, calculate its ID and // record it as outstanding if(!sent.isEmpty()) { BatchId id = b.finish(); db.addOutstandingBatch(txn, c, id, sent); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } } } } } public Collection generateBatch(ContactId c, BatchWriter b, Collection requested) throws DbException, IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { Collection sent = new ArrayList(); Collection considered = new ArrayList(); int bytesSent = 0; for(MessageId m : requested) { byte[] raw = db.getMessageIfSendable(txn, c, m); // If the message is still sendable, try to add // it to the batch. If the batch is full, don't // treat the message as considered, and don't // try to add any further messages. if(raw != null) { if(!b.writeMessage(raw)) break; bytesSent += raw.length; sent.add(m); } considered.add(m); } // If the batch is not empty, calculate its ID and // record it as outstanding if(!sent.isEmpty()) { BatchId id = b.finish(); db.addOutstandingBatch(txn, c, id, sent); } db.commitTransaction(txn); return considered; } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } } } } } public Collection generateOffer(ContactId c, OfferWriter o) throws DbException, IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { Collection sendable = db.getSendableMessages(txn, c, Integer.MAX_VALUE); Iterator it = sendable.iterator(); Collection sent = new ArrayList(); while(it.hasNext()) { MessageId m = it.next(); if(!o.writeMessageId(m)) break; sent.add(m); } o.finish(); db.commitTransaction(txn); return sent; } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } } } } public void generateSubscriptionUpdate(ContactId c, SubscriptionWriter s) throws DbException, IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { Map subs = db.getVisibleSubscriptions(txn, c); s.writeSubscriptions(subs, System.currentTimeMillis()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + subs.size() + " subscriptions"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } } } public void generateTransportUpdate(ContactId c, TransportWriter t) throws DbException, IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(transportLock) { Txn txn = db.startTransaction(); try { Map> transports = db.getTransports(txn); t.writeTransports(transports, System.currentTimeMillis()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + transports.size() + " transports"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } catch(IOException e) { db.abortTransaction(txn); throw e; } } } } public ConnectionWindow getConnectionWindow(ContactId c, int transportId) throws DbException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(windowLock) { Txn txn = db.startTransaction(); try { ConnectionWindow w = db.getConnectionWindow(txn, c, transportId); db.commitTransaction(txn); return w; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } public Collection getContacts() throws DbException { synchronized(contactLock) { Txn txn = db.startTransaction(); try { Collection contacts = db.getContacts(txn); db.commitTransaction(txn); return contacts; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } public Rating getRating(AuthorId a) throws DbException { synchronized(ratingLock) { Txn txn = db.startTransaction(); try { Rating r = db.getRating(txn, a); db.commitTransaction(txn); return r; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } public byte[] getSharedSecret(ContactId c) throws DbException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); Txn txn = db.startTransaction(); try { byte[] secret = db.getSharedSecret(txn, c); db.commitTransaction(txn); return secret; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } public Collection getSubscriptions() throws DbException { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { Collection subs = db.getSubscriptions(txn); db.commitTransaction(txn); return subs; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } public Map getTransportConfig(String name) throws DbException { synchronized(transportLock) { Txn txn = db.startTransaction(); try { Map config = db.getTransportConfig(txn, name); db.commitTransaction(txn); return config; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } public Map> getTransports() throws DbException { synchronized(transportLock) { Txn txn = db.startTransaction(); try { Map> transports = db.getTransports(txn); db.commitTransaction(txn); return transports; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } public Map> getTransports(ContactId c) throws DbException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(transportLock) { Txn txn = db.startTransaction(); try { Map> transports = db.getTransports(txn, c); db.commitTransaction(txn); return transports; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } public Collection getVisibility(GroupId g) throws DbException { synchronized(contactLock) { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { Collection visible = db.getVisibility(txn, g); db.commitTransaction(txn); return visible; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } public boolean hasSendableMessages(ContactId c) throws DbException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { boolean has = db.hasSendableMessages(txn, c); db.commitTransaction(txn); return has; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } } } public void receiveAck(ContactId c, Ack a) throws DbException { // Mark all messages in acked batches as seen synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { Collection acks = a.getBatchIds(); for(BatchId ack : acks) { Txn txn = db.startTransaction(); try { db.removeAckedBatch(txn, c, ack); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + acks.size() + " acks"); } } } } public void receiveBatch(ContactId c, Batch b) throws DbException { boolean anyAdded = false; waitForPermissionToWrite(); synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { anyAdded = storeMessages(txn, c, b.getMessages()); db.addBatchToAck(txn, c, b.getId()); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } } // Call the listeners outside the lock if(anyAdded) callListeners(Event.MESSAGES_ADDED); } public void receiveOffer(ContactId c, Offer o, RequestWriter r) throws DbException, IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { synchronized(subscriptionLock) { Collection offered = o.getMessageIds(); BitSet request = new BitSet(offered.size()); Txn txn = db.startTransaction(); try { Iterator it = offered.iterator(); for(int i = 0; it.hasNext(); i++) { // If the message is not in the database, or if // it is not visible to the contact, request it MessageId m = it.next(); if(!db.setStatusSeenIfVisible(txn, c, m)) request.set(i); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } r.writeRequest(request, offered.size()); } } } } } public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate s) throws DbException { // Update the contact's subscriptions synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { Map subs = s.getSubscriptions(); db.setSubscriptions(txn, c, subs, s.getTimestamp()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + subs.size() + " subscriptions"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } public void receiveTransportUpdate(ContactId c, TransportUpdate t) throws DbException { // Update the contact's transport properties synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(transportLock) { Txn txn = db.startTransaction(); try { Map> transports = t.getTransports(); db.setTransports(txn, c, transports, t.getTimestamp()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + transports.size() + " transports"); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } public void removeContact(ContactId c) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing contact " + c); synchronized(contactLock) { synchronized(messageLock) { synchronized(messageStatusLock) { synchronized(subscriptionLock) { synchronized(transportLock) { Txn txn = db.startTransaction(); try { db.removeContact(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } } } // Call the listeners outside the lock callListeners(Event.CONTACTS_UPDATED); } public void setConnectionWindow(ContactId c, int transportId, ConnectionWindow w) throws DbException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(windowLock) { Txn txn = db.startTransaction(); try { db.setConnectionWindow(txn, c, transportId, w); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); } } } } public void setRating(AuthorId a, Rating r) throws DbException { synchronized(messageLock) { synchronized(ratingLock) { Txn txn = db.startTransaction(); try { Rating old = db.setRating(txn, a, r); // Update the sendability of the author's messages if(r == Rating.GOOD && old != Rating.GOOD) updateAuthorSendability(txn, a, true); else if(r != Rating.GOOD && old == Rating.GOOD) updateAuthorSendability(txn, a, false); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } public void setTransportConfig(String name, Map config) throws DbException { boolean changed = false; synchronized(transportLock) { Txn txn = db.startTransaction(); try { Map old = db.getTransportConfig(txn, name); if(!config.equals(old)) { db.setTransportConfig(txn, name, config); changed = true; } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } // Call the listeners outside the lock if(changed) callListeners(Event.TRANSPORTS_UPDATED); } public void setTransportProperties(String name, Map properties) throws DbException { boolean changed = false; synchronized(transportLock) { Txn txn = db.startTransaction(); try { Map old = db.getTransports(txn).get(name); if(!properties.equals(old)) { db.setTransportProperties(txn, name, properties); changed = true; } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } // Call the listeners outside the lock if(changed) callListeners(Event.TRANSPORTS_UPDATED); } public void setVisibility(GroupId g, Collection visible) throws DbException { synchronized(contactLock) { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { // Remove any ex-contacts from the set Collection present = new ArrayList(visible.size()); for(ContactId c : visible) { if(db.containsContact(txn, c)) present.add(c); } db.setVisibility(txn, g, present); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } public void subscribe(Group g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); boolean added = false; synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { if(db.containsSubscription(txn, g.getId())) { db.addSubscription(txn, g); added = true; } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } // Call the listeners outside the lock if(added) callListeners(Event.SUBSCRIPTIONS_UPDATED); } public void unsubscribe(GroupId g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g); boolean removed = false; synchronized(contactLock) { synchronized(messageLock) { synchronized(messageStatusLock) { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { if(db.containsSubscription(txn, g)) { db.removeSubscription(txn, g); removed = true; } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; } } } } } // Call the listeners outside the lock if(removed) callListeners(Event.SUBSCRIPTIONS_UPDATED); } }