- * Locking is provided by the DatabaseComponent implementation. To prevent
- * deadlock, locks must be acquired in the following (alphabetical) order:
- *
{
- /** Opens the database and returns true if the database already existed. */
+ /**
+ * Opens the database and returns true if the database already existed.
+ *
+ * Locking: write.
+ */
boolean open() throws DbException, IOException;
/**
* Prevents new transactions from starting, waits for all current
* transactions to finish, and closes the database.
+ *
+ * Locking: write.
*/
void close() throws DbException, IOException;
@@ -92,8 +85,7 @@ interface Database {
* Stores a contact associated with the given local and remote pseudonyms,
* and returns an ID for the contact.
*
- * Locking: contact write, message write, retention write,
- * subscription write, transport write, window write.
+ * Locking: write.
*/
ContactId addContact(T txn, Author remote, AuthorId local)
throws DbException;
@@ -101,7 +93,7 @@ interface Database {
/**
* Stores an endpoint.
*
- * Locking: window write.
+ * Locking: write.
*/
void addEndpoint(T txn, Endpoint ep) throws DbException;
@@ -109,29 +101,28 @@ interface Database {
* Subscribes to a group, or returns false if the user already has the
* maximum number of subscriptions.
*
- * Locking: message write, subscription write.
+ * Locking: write.
*/
boolean addGroup(T txn, Group g) throws DbException;
/**
* Stores a local pseudonym.
*
- * Locking: contact write, identity write, message write, retention write,
- * subscription write, transport write, window write.
+ * Locking: write.
*/
void addLocalAuthor(T txn, LocalAuthor a) throws DbException;
/**
* Stores a message.
*
- * Locking: message write.
+ * Locking: write.
*/
void addMessage(T txn, Message m, boolean local) throws DbException;
/**
* Records that a message has been offered by the given contact.
*
- * Locking: message write.
+ * Locking: write.
*/
void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException;
@@ -139,7 +130,7 @@ interface Database {
* Stores the given temporary secrets and deletes any secrets that have
* been made obsolete.
*
- * Locking: window write.
+ * Locking: write.
*/
void addSecrets(T txn, Collection secrets)
throws DbException;
@@ -147,10 +138,10 @@ interface Database {
/**
* Initialises the status of the given message with respect to the given
* contact.
+ *
+ * Locking: write.
* @param ack whether the message needs to be acknowledged.
* @param seen whether the contact has seen the message.
- *
- * Locking: message write.
*/
void addStatus(T txn, ContactId c, MessageId m, boolean ack, boolean seen)
throws DbException;
@@ -159,7 +150,7 @@ interface Database {
* Stores a transport and returns true if the transport was not previously
* in the database.
*
- * Locking: transport write, window write.
+ * Locking: write.
*/
boolean addTransport(T txn, TransportId t, long maxLatency)
throws DbException;
@@ -167,49 +158,49 @@ interface Database {
/**
* Makes a group visible to the given contact.
*
- * Locking: subscription write.
+ * Locking: write.
*/
void addVisibility(T txn, ContactId c, GroupId g) throws DbException;
/**
* Returns true if the database contains the given contact.
*
- * Locking: contact read.
+ * Locking: read.
*/
boolean containsContact(T txn, AuthorId a) throws DbException;
/**
* Returns true if the database contains the given contact.
*
- * Locking: contact read.
+ * Locking: read.
*/
boolean containsContact(T txn, ContactId c) throws DbException;
/**
* Returns true if the user subscribes to the given group.
*
- * Locking: subscription read.
+ * Locking: read.
*/
boolean containsGroup(T txn, GroupId g) throws DbException;
/**
* Returns true if the database contains the given local pseudonym.
*
- * Locking: identity read.
+ * Locking: read.
*/
boolean containsLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Returns true if the database contains the given message.
*
- * Locking: message read.
+ * Locking: read.
*/
boolean containsMessage(T txn, MessageId m) throws DbException;
/**
* Returns true if the database contains the given transport.
*
- * Locking: transport read.
+ * Locking: read.
*/
boolean containsTransport(T txn, TransportId t) throws DbException;
@@ -217,7 +208,7 @@ interface Database {
* Returns true if the user subscribes to the given group and the group is
* visible to the given contact.
*
- * Locking: subscription read.
+ * Locking: read.
*/
boolean containsVisibleGroup(T txn, ContactId c, GroupId g)
throws DbException;
@@ -226,7 +217,7 @@ interface Database {
* Returns true if the database contains the given message and the message
* is visible to the given contact.
*
- * Locking: message read, subscription read.
+ * Locking: read.
*/
boolean containsVisibleMessage(T txn, ContactId c, MessageId m)
throws DbException;
@@ -234,7 +225,7 @@ interface Database {
/**
* Returns the number of messages offered by the given contact.
*
- * Locking: message read.
+ * Locking: read.
*/
int countOfferedMessages(T txn, ContactId c) throws DbException;
@@ -242,49 +233,49 @@ interface Database {
* Returns the status of all groups to which the user subscribes or can
* subscribe, excluding inbox groups.
*
- * Locking: subscription read.
+ * Locking: read.
*/
Collection getAvailableGroups(T txn) throws DbException;
/**
* Returns the configuration for the given transport.
*
- * Locking: transport read.
+ * Locking: read.
*/
TransportConfig getConfig(T txn, TransportId t) throws DbException;
/**
* Returns the contact with the given ID.
*
- * Locking: contact read.
+ * Locking: read.
*/
Contact getContact(T txn, ContactId c) throws DbException;
/**
* Returns the IDs of all contacts.
*
- * Locking: contact read.
+ * Locking: read.
*/
Collection getContactIds(T txn) throws DbException;
/**
* Returns all contacts.
*
- * Locking: contact read, window read.
+ * Locking: read.
*/
Collection getContacts(T txn) throws DbException;
/**
* Returns all contacts associated with the given local pseudonym.
*
- * Locking: contact read.
+ * Locking: read.
*/
Collection getContacts(T txn, AuthorId a) throws DbException;
/**
* Returns all endpoints.
*
- * Locking: window read.
+ * Locking: read.
*/
Collection getEndpoints(T txn) throws DbException;
@@ -298,14 +289,14 @@ interface Database {
/**
* Returns the group with the given ID, if the user subscribes to it.
*
- * Locking: subscription read.
+ * Locking: read.
*/
Group getGroup(T txn, GroupId g) throws DbException;
/**
* Returns all groups to which the user subscribes.
*
- * Locking: subscription read.
+ * Locking: read.
*/
Collection getGroups(T txn) throws DbException;
@@ -313,7 +304,7 @@ interface Database {
* Returns the ID of the inbox group for the given contact, or null if no
* inbox group has been set.
*
- * Locking: contact read, subscription read.
+ * Locking: read.
*/
GroupId getInboxGroupId(T txn, ContactId c) throws DbException;
@@ -321,7 +312,7 @@ interface Database {
* Returns the headers of all messages in the inbox group for the given
* contact, or null if no inbox group has been set.
*
- * Locking: contact read, identity read, message read, subscription read.
+ * Locking: read.
*/
Collection getInboxMessageHeaders(T txn, ContactId c)
throws DbException;
@@ -329,21 +320,21 @@ interface Database {
/**
* Returns the local pseudonym with the given ID.
*
- * Locking: identity read.
+ * Locking: read.
*/
LocalAuthor getLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Returns all local pseudonyms.
*
- * Locking: identity read.
+ * Locking: read.
*/
Collection getLocalAuthors(T txn) throws DbException;
/**
* Returns the local transport properties for all transports.
*
- * Locking: transport read.
+ * Locking: read.
*/
Map getLocalProperties(T txn)
throws DbException;
@@ -351,7 +342,7 @@ interface Database {
/**
* Returns the local transport properties for the given transport.
*
- * Locking: transport read.
+ * Locking: read.
*/
TransportProperties getLocalProperties(T txn, TransportId t)
throws DbException;
@@ -359,14 +350,14 @@ interface Database {
/**
* Returns the body of the message identified by the given ID.
*
- * Locking: message read.
+ * Locking: read.
*/
byte[] getMessageBody(T txn, MessageId m) throws DbException;
/**
* Returns the headers of all messages in the given group.
*
- * Locking: message read.
+ * Locking: read.
*/
Collection getMessageHeaders(T txn, GroupId g)
throws DbException;
@@ -375,7 +366,7 @@ interface Database {
* Returns the IDs of some messages received from the given contact that
* need to be acknowledged, up to the given number of messages.
*
- * Locking: message read.
+ * Locking: read.
*/
Collection getMessagesToAck(T txn, ContactId c, int maxMessages)
throws DbException;
@@ -384,7 +375,7 @@ interface Database {
* Returns the IDs of some messages that are eligible to be offered to the
* given contact, up to the given number of messages.
*
- * Locking: message read, subscription read.
+ * Locking: read.
*/
Collection getMessagesToOffer(T txn, ContactId c,
int maxMessages) throws DbException;
@@ -393,7 +384,7 @@ interface Database {
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given total length.
*
- * Locking: message read, subscription read.
+ * Locking: read.
*/
Collection getMessagesToSend(T txn, ContactId c, int maxLength)
throws DbException;
@@ -402,7 +393,7 @@ interface Database {
* Returns the IDs of some messages that are eligible to be requested from
* the given contact, up to the given number of messages.
*
- * Locking: message read.
+ * Locking: read.
*/
Collection getMessagesToRequest(T txn, ContactId c,
int maxMessages) throws DbException;
@@ -411,7 +402,7 @@ interface Database {
* Returns the IDs of the oldest messages in the database, with a total
* size less than or equal to the given size.
*
- * Locking: message read.
+ * Locking: read.
*/
Collection getOldMessages(T txn, int size) throws DbException;
@@ -420,28 +411,28 @@ interface Database {
* has no parent, or the parent is absent from the database, or the parent
* belongs to a different group.
*
- * Locking: message read.
+ * Locking: read.
*/
MessageId getParent(T txn, MessageId m) throws DbException;
/**
* Returns the message identified by the given ID, in serialised form.
*
- * Locking: message read.
+ * Locking: read.
*/
byte[] getRawMessage(T txn, MessageId m) throws DbException;
/**
* Returns true if the given message is marked as read.
*
- * Locking: message read.
+ * Locking: read.
*/
boolean getReadFlag(T txn, MessageId m) throws DbException;
/**
* Returns all remote properties for the given transport.
*
- * Locking: transport read.
+ * Locking: read.
*/
Map getRemoteProperties(T txn,
TransportId t) throws DbException;
@@ -451,7 +442,7 @@ interface Database {
* given contact and have been requested by the contact, up to the given
* total length.
*
- * Locking: message read, subscription read.
+ * Locking: read.
*/
Collection getRequestedMessagesToSend(T txn, ContactId c,
int maxLength) throws DbException;
@@ -459,7 +450,7 @@ interface Database {
/**
* Returns a retention ack for the given contact, or null if no ack is due.
*
- * Locking: retention write.
+ * Locking: write.
*/
RetentionAck getRetentionAck(T txn, ContactId c) throws DbException;
@@ -467,7 +458,7 @@ interface Database {
* Returns a retention update for the given contact and updates its expiry
* time using the given latency, or returns null if no update is due.
*
- * Locking: message read, retention write.
+ * Locking: write.
*/
RetentionUpdate getRetentionUpdate(T txn, ContactId c, long maxLatency)
throws DbException;
@@ -475,21 +466,21 @@ interface Database {
/**
* Returns all temporary secrets.
*
- * Locking: window read.
+ * Locking: read.
*/
Collection getSecrets(T txn) throws DbException;
/**
* Returns all settings.
*
- * Locking: setting read.
+ * Locking: read.
*/
Settings getSettings(T txn) throws DbException;
/**
* Returns all contacts who subscribe to the given group.
*
- * Locking: subscription read.
+ * Locking: read.
*/
Collection getSubscribers(T txn, GroupId g) throws DbException;
@@ -497,7 +488,7 @@ interface Database {
* Returns a subscription ack for the given contact, or null if no ack is
* due.
*
- * Locking: subscription write.
+ * Locking: write.
*/
SubscriptionAck getSubscriptionAck(T txn, ContactId c) throws DbException;
@@ -505,7 +496,7 @@ interface Database {
* Returns a subscription update for the given contact and updates its
* expiry time using the given latency, or returns null if no update is due.
*
- * Locking: subscription write.
+ * Locking: write.
*/
SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c,
long maxLatency) throws DbException;
@@ -514,7 +505,7 @@ interface Database {
* Returns a collection of transport acks for the given contact, or null if
* no acks are due.
*
- * Locking: transport write.
+ * Locking: write.
*/
Collection getTransportAcks(T txn, ContactId c)
throws DbException;
@@ -522,7 +513,7 @@ interface Database {
/**
* Returns the maximum latencies of all local transports.
*
- * Locking: transport read.
+ * Locking: read.
*/
Map getTransportLatencies(T txn) throws DbException;
@@ -531,7 +522,7 @@ interface Database {
* updates their expiry times using the given latency, or returns null if
* no updates are due.
*
- * Locking: transport write.
+ * Locking: write.
*/
Collection getTransportUpdates(T txn, ContactId c,
long maxLatency) throws DbException;
@@ -539,14 +530,14 @@ interface Database {
/**
* Returns the number of unread messages in each subscribed group.
*
- * Locking: message read.
+ * Locking: read.
*/
Map getUnreadMessageCounts(T txn) throws DbException;
/**
* Returns the IDs of all contacts to which the given group is visible.
*
- * Locking: subscription read.
+ * Locking: read.
*/
Collection getVisibility(T txn, GroupId g) throws DbException;
@@ -555,7 +546,7 @@ interface Database {
* in the given rotation period and returns the old value, or -1 if the
* counter does not exist.
*
- * Locking: window write.
+ * Locking: write.
*/
long incrementConnectionCounter(T txn, ContactId c, TransportId t,
long period) throws DbException;
@@ -564,7 +555,7 @@ interface Database {
* Increments the retention time versions for all contacts to indicate that
* the database's retention time has changed and updates should be sent.
*
- * Locking: retention write.
+ * Locking: write.
*/
void incrementRetentionVersions(T txn) throws DbException;
@@ -572,7 +563,7 @@ interface Database {
* Marks the given messages as not needing to be acknowledged to the
* given contact.
*
- * Locking: message write.
+ * Locking: write.
*/
void lowerAckFlag(T txn, ContactId c, Collection acked)
throws DbException;
@@ -581,7 +572,7 @@ interface Database {
* Marks the given messages as not having been requested by the given
* contact.
*
- * Locking: message write.
+ * Locking: write.
*/
void lowerRequestedFlag(T txn, ContactId c, Collection requested)
throws DbException;
@@ -590,7 +581,7 @@ interface Database {
* Merges the given configuration with the existing configuration for the
* given transport.
*
- * Locking: transport write.
+ * Locking: write.
*/
void mergeConfig(T txn, TransportId t, TransportConfig config)
throws DbException;
@@ -599,7 +590,7 @@ interface Database {
* Merges the given properties with the existing local properties for the
* given transport.
*
- * Locking: transport write.
+ * Locking: write.
*/
void mergeLocalProperties(T txn, TransportId t, TransportProperties p)
throws DbException;
@@ -607,36 +598,35 @@ interface Database {
/**
* Merges the given settings with the existing settings.
*
- * Locking: setting write.
+ * Locking: write.
*/
void mergeSettings(T txn, Settings s) throws DbException;
/**
* Marks a message as needing to be acknowledged to the given contact.
*
- * Locking: message write.
+ * Locking: write.
*/
void raiseAckFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks a message as having been requested by the given contact.
*
- * Locking: message write.
+ * Locking: write.
*/
void raiseRequestedFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks a message as having been seen by the given contact.
*
- * Locking: message write.
+ * Locking: write.
*/
void raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Removes a contact from the database.
*
- * Locking: contact write, message write, retention write,
- * subscription write, transport write, window write.
+ * Locking: write.
*/
void removeContact(T txn, ContactId c) throws DbException;
@@ -644,7 +634,7 @@ interface Database {
* Unsubscribes from a group. Any messages belonging to the group are
* deleted from the database.
*
- * Locking: message write, subscription write.
+ * Locking: write.
*/
void removeGroup(T txn, GroupId g) throws DbException;
@@ -652,15 +642,14 @@ interface Database {
* Removes a local pseudonym (and all associated contacts) from the
* database.
*
- * Locking: contact write, identity write, message write, retention write,
- * subscription write, transport write, window write.
+ * Locking: write.
*/
void removeLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Removes a message (and all associated state) from the database.
*
- * Locking: message write.
+ * Locking: write.
*/
void removeMessage(T txn, MessageId m) throws DbException;
@@ -668,7 +657,7 @@ interface Database {
* Removes an offered message that was offered by the given contact, or
* returns false if there is no such message.
*
- * Locking: message write.
+ * Locking: write.
*/
boolean removeOfferedMessage(T txn, ContactId c, MessageId m)
throws DbException;
@@ -677,7 +666,7 @@ interface Database {
* Removes the given offered messages that were offered by the given
* contact.
*
- * Locking: message write.
+ * Locking: write.
*/
void removeOfferedMessages(T txn, ContactId c,
Collection requested) throws DbException;
@@ -685,14 +674,14 @@ interface Database {
/**
* Removes a transport (and all associated state) from the database.
*
- * Locking: transport write, window write.
+ * Locking: write.
*/
void removeTransport(T txn, TransportId t) throws DbException;
/**
* Makes a group invisible to the given contact.
*
- * Locking: subscription write.
+ * Locking: write.
*/
void removeVisibility(T txn, ContactId c, GroupId g) throws DbException;
@@ -700,7 +689,7 @@ interface Database {
* Resets the transmission count and expiry time of the given message with
* respect to the given contact.
*
- * Locking: message write.
+ * Locking: write.
*/
void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException;
@@ -708,7 +697,7 @@ interface Database {
* Sets the connection reordering window for the given endpoint in the
* given rotation period.
*
- * Locking: window write.
+ * Locking: write.
*/
void setConnectionWindow(T txn, ContactId c, TransportId t, long period,
long centre, byte[] bitmap) throws DbException;
@@ -718,7 +707,7 @@ interface Database {
* true, unless an update with an equal or higher version number has
* already been received from the contact.
*
- * Locking: message write, subscription write.
+ * Locking: write.
*/
boolean setGroups(T txn, ContactId c, Collection groups,
long version) throws DbException;
@@ -727,14 +716,14 @@ interface Database {
* Makes a group visible to the given contact, adds it to the contact's
* subscriptions, and sets it as the inbox group for the contact.
*
- * Locking: subscription write.
+ * Locking: write.
*/
public void setInboxGroup(T txn, ContactId c, Group g) throws DbException;
/**
* Marks a message as read or unread.
*
- * Locking: message write.
+ * Locking: write.
*/
void setReadFlag(T txn, MessageId m, boolean read) throws DbException;
@@ -742,7 +731,7 @@ interface Database {
* Sets the remote transport properties for the given contact, replacing
* any existing properties.
*
- * Locking: transport write.
+ * Locking: write.
*/
void setRemoteProperties(T txn, ContactId c,
Map p) throws DbException;
@@ -753,7 +742,7 @@ interface Database {
* unless an update with an equal or higher version number has already been
* received from the contact.
*
- * Locking: transport write.
+ * Locking: write.
*/
boolean setRemoteProperties(T txn, ContactId c, TransportId t,
TransportProperties p, long version) throws DbException;
@@ -763,7 +752,7 @@ interface Database {
* true, unless an update with an equal or higher version number has
* already been received from the contact.
*
- * Locking: retention write.
+ * Locking: write.
*/
boolean setRetentionTime(T txn, ContactId c, long retention, long version)
throws DbException;
@@ -772,7 +761,7 @@ interface Database {
* Records a retention ack from the given contact for the given version,
* unless the contact has already acked an equal or higher version.
*
- * Locking: retention write.
+ * Locking: write.
*/
void setRetentionUpdateAcked(T txn, ContactId c, long version)
throws DbException;
@@ -781,7 +770,7 @@ interface Database {
* Records a subscription ack from the given contact for the given version,
* unless the contact has already acked an equal or higher version.
*
- * Locking: subscription write.
+ * Locking: write.
*/
void setSubscriptionUpdateAcked(T txn, ContactId c, long version)
throws DbException;
@@ -790,7 +779,7 @@ interface Database {
* Records a transport ack from the give contact for the given version,
* unless the contact has already acked an equal or higher version.
*
- * Locking: transport write.
+ * Locking: write.
*/
void setTransportUpdateAcked(T txn, ContactId c, TransportId t,
long version) throws DbException;
@@ -798,7 +787,7 @@ interface Database {
/**
* Makes a group visible or invisible to future contacts by default.
*
- * Locking: subscription write.
+ * Locking: write.
*/
void setVisibleToAll(T txn, GroupId g, boolean all) throws DbException;
@@ -807,7 +796,7 @@ interface Database {
* with respect to the given contact, using the latency of the transport
* over which it was sent.
*
- * Locking: message write.
+ * Locking: write.
*/
void updateExpiryTime(T txn, ContactId c, MessageId m, long maxLatency)
throws DbException;
diff --git a/briar-core/src/org/briarproject/db/DatabaseComponentImpl.java b/briar-core/src/org/briarproject/db/DatabaseComponentImpl.java
index ed245931e..79113fbd4 100644
--- a/briar-core/src/org/briarproject/db/DatabaseComponentImpl.java
+++ b/briar-core/src/org/briarproject/db/DatabaseComponentImpl.java
@@ -93,38 +93,17 @@ DatabaseCleaner.Callback {
Logger.getLogger(DatabaseComponentImpl.class.getName());
private static final int MS_BETWEEN_SWEEPS = 10 * 1000; // 10 seconds
- /*
- * Locks must always be acquired in alphabetical order. See the Database
- * interface to find out which calls require which locks.
- */
-
- private final ReentrantReadWriteLock contactLock =
- new ReentrantReadWriteLock(true);
- private final ReentrantReadWriteLock identityLock =
- new ReentrantReadWriteLock(true);
- private final ReentrantReadWriteLock messageLock =
- new ReentrantReadWriteLock(true);
- private final ReentrantReadWriteLock retentionLock =
- new ReentrantReadWriteLock(true);
- private final ReentrantReadWriteLock settingLock =
- new ReentrantReadWriteLock(true);
- private final ReentrantReadWriteLock subscriptionLock =
- new ReentrantReadWriteLock(true);
- private final ReentrantReadWriteLock transportLock =
- new ReentrantReadWriteLock(true);
- private final ReentrantReadWriteLock windowLock =
- new ReentrantReadWriteLock(true);
-
private final Database db;
private final DatabaseCleaner cleaner;
private final ShutdownManager shutdown;
+ private final ReentrantReadWriteLock lock =
+ new ReentrantReadWriteLock(true);
private final Collection listeners =
new CopyOnWriteArrayList();
- private final Object openCloseLock = new Object();
- private boolean open = false; // Locking: openCloseLock;
- private int shutdownHandle = -1; // Locking: openCloseLock;
+ private boolean open = false; // Locking: lock.writeLock
+ private int shutdownHandle = -1; // Locking: lock.writeLock
@Inject
DatabaseComponentImpl(Database db, DatabaseCleaner cleaner,
@@ -135,39 +114,47 @@ DatabaseCleaner.Callback {
}
public boolean open() throws DbException, IOException {
- synchronized(openCloseLock) {
+ Runnable shutdownHook = new Runnable() {
+ public void run() {
+ lock.writeLock().lock();
+ try {
+ shutdownHandle = -1;
+ close();
+ } catch(DbException e) {
+ if(LOG.isLoggable(WARNING))
+ LOG.log(WARNING, e.toString(), e);
+ } catch(IOException e) {
+ if(LOG.isLoggable(WARNING))
+ LOG.log(WARNING, e.toString(), e);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ };
+ lock.writeLock().lock();
+ try {
if(open) throw new IllegalStateException();
open = true;
boolean reopened = db.open();
cleaner.startCleaning(this, MS_BETWEEN_SWEEPS);
- shutdownHandle = shutdown.addShutdownHook(new Runnable() {
- public void run() {
- try {
- synchronized(openCloseLock) {
- shutdownHandle = -1;
- close();
- }
- } catch(DbException e) {
- if(LOG.isLoggable(WARNING))
- LOG.log(WARNING, e.toString(), e);
- } catch(IOException e) {
- if(LOG.isLoggable(WARNING))
- LOG.log(WARNING, e.toString(), e);
- }
- }
- });
+ shutdownHandle = shutdown.addShutdownHook(shutdownHook);
return reopened;
+ } finally {
+ lock.writeLock().unlock();
}
}
public void close() throws DbException, IOException {
- synchronized(openCloseLock) {
+ lock.writeLock().lock();
+ try {
if(!open) return;
open = false;
if(shutdownHandle != -1)
shutdown.removeShutdownHook(shutdownHandle);
cleaner.stopCleaning();
db.close();
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -179,208 +166,119 @@ DatabaseCleaner.Callback {
listeners.remove(l);
}
- public ContactId addContact(Author remote, AuthorId local)
- throws DbException {
- ContactId c;
- contactLock.writeLock().lock();
- try {
- identityLock.readLock().lock();
- try {
- messageLock.writeLock().lock();
- try {
- retentionLock.writeLock().lock();
- try {
- subscriptionLock.writeLock().lock();
- try {
- transportLock.writeLock().lock();
- try {
- windowLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(db.containsContact(txn, remote.getId()))
- throw new ContactExistsException();
- if(!db.containsLocalAuthor(txn, local))
- throw new NoSuchLocalAuthorException();
- c = db.addContact(txn, remote, local);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
- }
- } finally {
- transportLock.writeLock().unlock();
- }
- } finally {
- subscriptionLock.writeLock().unlock();
- }
- } finally {
- retentionLock.writeLock().unlock();
- }
- } finally {
- messageLock.writeLock().unlock();
- }
- } finally {
- identityLock.readLock().unlock();
- }
- } finally {
- contactLock.writeLock().unlock();
- }
- callListeners(new ContactAddedEvent(c));
- return c;
- }
-
/** Notifies all listeners of a database event. */
private void callListeners(Event e) {
for(EventListener l : listeners) l.eventOccurred(e);
}
- public void addEndpoint(Endpoint ep) throws DbException {
- contactLock.readLock().lock();
+ public ContactId addContact(Author remote, AuthorId local)
+ throws DbException {
+ ContactId c;
+ lock.writeLock().lock();
try {
- transportLock.readLock().lock();
+ T txn = db.startTransaction();
try {
- windowLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, ep.getContactId()))
- throw new NoSuchContactException();
- if(!db.containsTransport(txn, ep.getTransportId()))
- throw new NoSuchTransportException();
- db.addEndpoint(txn, ep);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
- }
- } finally {
- transportLock.readLock().unlock();
+ if(db.containsContact(txn, remote.getId()))
+ throw new ContactExistsException();
+ if(!db.containsLocalAuthor(txn, local))
+ throw new NoSuchLocalAuthorException();
+ c = db.addContact(txn, remote, local);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
+ }
+ callListeners(new ContactAddedEvent(c));
+ return c;
+ }
+
+ public void addEndpoint(Endpoint ep) throws DbException {
+ lock.writeLock().lock();
+ try {
+ T txn = db.startTransaction();
+ try {
+ if(!db.containsContact(txn, ep.getContactId()))
+ throw new NoSuchContactException();
+ if(!db.containsTransport(txn, ep.getTransportId()))
+ throw new NoSuchTransportException();
+ db.addEndpoint(txn, ep);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
+ }
+ } finally {
+ lock.writeLock().unlock();
}
}
public boolean addGroup(Group g) throws DbException {
boolean added = false;
- messageLock.writeLock().lock();
+ lock.writeLock().lock();
try {
- subscriptionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsGroup(txn, g.getId()))
- added = db.addGroup(txn, g);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.writeLock().unlock();
+ if(!db.containsGroup(txn, g.getId()))
+ added = db.addGroup(txn, g);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- messageLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
if(added) callListeners(new SubscriptionAddedEvent(g));
return added;
}
public void addLocalAuthor(LocalAuthor a) throws DbException {
- contactLock.writeLock().lock();
+ lock.writeLock().lock();
try {
- identityLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- messageLock.writeLock().lock();
- try {
- retentionLock.writeLock().lock();
- try {
- subscriptionLock.writeLock().lock();
- try {
- transportLock.writeLock().lock();
- try {
- windowLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(db.containsLocalAuthor(txn, a.getId()))
- throw new LocalAuthorExistsException();
- db.addLocalAuthor(txn, a);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
- }
- } finally {
- transportLock.writeLock().unlock();
- }
- } finally {
- subscriptionLock.writeLock().unlock();
- }
- } finally {
- retentionLock.writeLock().unlock();
- }
- } finally {
- messageLock.writeLock().unlock();
- }
- } finally {
- identityLock.writeLock().unlock();
+ if(db.containsLocalAuthor(txn, a.getId()))
+ throw new LocalAuthorExistsException();
+ db.addLocalAuthor(txn, a);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
callListeners(new LocalAuthorAddedEvent(a.getId()));
}
public void addLocalMessage(Message m) throws DbException {
boolean duplicate;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- subscriptionLock.readLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- duplicate = db.containsMessage(txn, m.getId());
- if(!duplicate) {
- GroupId g = m.getGroup().getId();
- if(db.containsGroup(txn, g))
- addMessage(txn, m, null);
- }
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.readLock().unlock();
- }
- } finally {
- messageLock.writeLock().unlock();
+ duplicate = db.containsMessage(txn, m.getId());
+ if(!duplicate && db.containsGroup(txn, m.getGroup().getId()))
+ addMessage(txn, m, null);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
- if(!duplicate)
- callListeners(new MessageAddedEvent(m.getGroup(), null));
+ if(!duplicate) callListeners(new MessageAddedEvent(m.getGroup(), null));
}
/**
* Stores a message, initialises its status with respect to each contact,
* and marks it as read if it was locally generated.
*
- * Locking: contact read, message write, subscription read.
+ * Locking: write.
* @param sender null for a locally generated message.
*/
private void addMessage(T txn, Message m, ContactId sender)
@@ -408,60 +306,43 @@ DatabaseCleaner.Callback {
public void addSecrets(Collection secrets)
throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- transportLock.readLock().lock();
+ T txn = db.startTransaction();
try {
- windowLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- Collection relevant =
- new ArrayList();
- for(TemporarySecret s : secrets) {
- ContactId c = s.getContactId();
- if(!db.containsContact(txn, c)) continue;
- TransportId t = s.getTransportId();
- if(!db.containsTransport(txn, t)) continue;
+ Collection relevant =
+ new ArrayList();
+ for(TemporarySecret s : secrets) {
+ if(db.containsContact(txn, s.getContactId()))
+ if(db.containsTransport(txn, s.getTransportId()))
relevant.add(s);
- }
- if(!secrets.isEmpty()) db.addSecrets(txn, relevant);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
}
- } finally {
- transportLock.readLock().unlock();
+ if(!secrets.isEmpty()) db.addSecrets(txn, relevant);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public boolean addTransport(TransportId t, long maxLatency)
throws DbException {
boolean added;
- transportLock.writeLock().lock();
+ lock.writeLock().lock();
try {
- windowLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- added = db.addTransport(txn, t, maxLatency);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
+ added = db.addTransport(txn, t, maxLatency);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- transportLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
if(added) callListeners(new TransportAddedEvent(t, maxLatency));
return added;
@@ -469,26 +350,21 @@ DatabaseCleaner.Callback {
public Ack generateAck(ContactId c, int maxMessages) throws DbException {
Collection ids;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- ids = db.getMessagesToAck(txn, c, maxMessages);
- if(!ids.isEmpty()) db.lowerAckFlag(txn, c, ids);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- messageLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ ids = db.getMessagesToAck(txn, c, maxMessages);
+ if(!ids.isEmpty()) db.lowerAckFlag(txn, c, ids);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(ids.isEmpty()) return null;
return new Ack(ids);
@@ -498,35 +374,25 @@ DatabaseCleaner.Callback {
long maxLatency) throws DbException {
Collection ids;
List messages = new ArrayList();
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- subscriptionLock.readLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- ids = db.getMessagesToSend(txn, c, maxLength);
- for(MessageId m : ids) {
- messages.add(db.getRawMessage(txn, m));
- db.updateExpiryTime(txn, c, m, maxLatency);
- }
- if(!ids.isEmpty()) db.lowerRequestedFlag(txn, c, ids);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.readLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ ids = db.getMessagesToSend(txn, c, maxLength);
+ for(MessageId m : ids) {
+ messages.add(db.getRawMessage(txn, m));
+ db.updateExpiryTime(txn, c, m, maxLatency);
}
- } finally {
- messageLock.writeLock().unlock();
+ if(!ids.isEmpty()) db.lowerRequestedFlag(txn, c, ids);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(messages.isEmpty()) return null;
return Collections.unmodifiableList(messages);
@@ -535,32 +401,22 @@ DatabaseCleaner.Callback {
public Offer generateOffer(ContactId c, int maxMessages, long maxLatency)
throws DbException {
Collection ids;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- subscriptionLock.readLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- ids = db.getMessagesToOffer(txn, c, maxMessages);
- for(MessageId m : ids)
- db.updateExpiryTime(txn, c, m, maxLatency);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.readLock().unlock();
- }
- } finally {
- messageLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ ids = db.getMessagesToOffer(txn, c, maxMessages);
+ for(MessageId m : ids)
+ db.updateExpiryTime(txn, c, m, maxLatency);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(ids.isEmpty()) return null;
return new Offer(ids);
@@ -569,26 +425,21 @@ DatabaseCleaner.Callback {
public Request generateRequest(ContactId c, int maxMessages)
throws DbException {
Collection ids;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- ids = db.getMessagesToRequest(txn, c, maxMessages);
- if(!ids.isEmpty()) db.removeOfferedMessages(txn, c, ids);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- messageLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ ids = db.getMessagesToRequest(txn, c, maxMessages);
+ if(!ids.isEmpty()) db.removeOfferedMessages(txn, c, ids);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(ids.isEmpty()) return null;
return new Request(ids);
@@ -598,199 +449,153 @@ DatabaseCleaner.Callback {
long maxLatency) throws DbException {
Collection ids;
List messages = new ArrayList();
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- subscriptionLock.readLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- ids = db.getRequestedMessagesToSend(txn, c, maxLength);
- for(MessageId m : ids) {
- messages.add(db.getRawMessage(txn, m));
- db.updateExpiryTime(txn, c, m, maxLatency);
- }
- if(!ids.isEmpty()) db.lowerRequestedFlag(txn, c, ids);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.readLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ ids = db.getRequestedMessagesToSend(txn, c, maxLength);
+ for(MessageId m : ids) {
+ messages.add(db.getRawMessage(txn, m));
+ db.updateExpiryTime(txn, c, m, maxLatency);
}
- } finally {
- messageLock.writeLock().unlock();
+ if(!ids.isEmpty()) db.lowerRequestedFlag(txn, c, ids);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(messages.isEmpty()) return null;
return Collections.unmodifiableList(messages);
}
public RetentionAck generateRetentionAck(ContactId c) throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- retentionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- RetentionAck a = db.getRetentionAck(txn, c);
- db.commitTransaction(txn);
- return a;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- retentionLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ RetentionAck a = db.getRetentionAck(txn, c);
+ db.commitTransaction(txn);
+ return a;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public RetentionUpdate generateRetentionUpdate(ContactId c, long maxLatency)
throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.readLock().lock();
+ T txn = db.startTransaction();
try {
- retentionLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- RetentionUpdate u =
- db.getRetentionUpdate(txn, c, maxLatency);
- db.commitTransaction(txn);
- return u;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- retentionLock.writeLock().unlock();
- }
- } finally {
- messageLock.readLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ RetentionUpdate u = db.getRetentionUpdate(txn, c, maxLatency);
+ db.commitTransaction(txn);
+ return u;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public SubscriptionAck generateSubscriptionAck(ContactId c)
throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- subscriptionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- SubscriptionAck a = db.getSubscriptionAck(txn, c);
- db.commitTransaction(txn);
- return a;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ SubscriptionAck a = db.getSubscriptionAck(txn, c);
+ db.commitTransaction(txn);
+ return a;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public SubscriptionUpdate generateSubscriptionUpdate(ContactId c,
long maxLatency) throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- subscriptionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- SubscriptionUpdate u =
- db.getSubscriptionUpdate(txn, c, maxLatency);
- db.commitTransaction(txn);
- return u;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ SubscriptionUpdate u =
+ db.getSubscriptionUpdate(txn, c, maxLatency);
+ db.commitTransaction(txn);
+ return u;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public Collection generateTransportAcks(ContactId c)
throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- transportLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- Collection acks = db.getTransportAcks(txn, c);
- db.commitTransaction(txn);
- return acks;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- transportLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ Collection acks = db.getTransportAcks(txn, c);
+ db.commitTransaction(txn);
+ return acks;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public Collection generateTransportUpdates(ContactId c,
long maxLatency) throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- transportLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- Collection updates =
- db.getTransportUpdates(txn, c, maxLatency);
- db.commitTransaction(txn);
- return updates;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- transportLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ Collection updates =
+ db.getTransportUpdates(txn, c, maxLatency);
+ db.commitTransaction(txn);
+ return updates;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public Collection getAvailableGroups() throws DbException {
- subscriptionLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -802,12 +607,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- subscriptionLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public TransportConfig getConfig(TransportId t) throws DbException {
- transportLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -821,12 +626,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- transportLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Contact getContact(ContactId c) throws DbException {
- contactLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -840,34 +645,29 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Collection getContacts() throws DbException {
- contactLock.readLock().lock();
+ lock.readLock().lock();
try {
- windowLock.readLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- Collection contacts = db.getContacts(txn);
- db.commitTransaction(txn);
- return contacts;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.readLock().unlock();
+ Collection contacts = db.getContacts(txn);
+ db.commitTransaction(txn);
+ return contacts;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Group getGroup(GroupId g) throws DbException {
- subscriptionLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -881,12 +681,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- subscriptionLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Collection getGroups() throws DbException {
- subscriptionLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -898,72 +698,52 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- subscriptionLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public GroupId getInboxGroupId(ContactId c) throws DbException {
- contactLock.readLock().lock();
+ lock.readLock().lock();
try {
- subscriptionLock.readLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- GroupId inbox = db.getInboxGroupId(txn, c);
- db.commitTransaction(txn);
- return inbox;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.readLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ GroupId inbox = db.getInboxGroupId(txn, c);
+ db.commitTransaction(txn);
+ return inbox;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Collection getInboxMessageHeaders(ContactId c)
throws DbException {
- contactLock.readLock().lock();
+ lock.readLock().lock();
try {
- identityLock.readLock().lock();
+ T txn = db.startTransaction();
try {
- messageLock.readLock().lock();
- try {
- subscriptionLock.readLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- Collection headers =
- db.getInboxMessageHeaders(txn, c);
- db.commitTransaction(txn);
- return headers;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.readLock().unlock();
- }
- } finally {
- messageLock.readLock().unlock();
- }
- } finally {
- identityLock.readLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ Collection headers =
+ db.getInboxMessageHeaders(txn, c);
+ db.commitTransaction(txn);
+ return headers;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public LocalAuthor getLocalAuthor(AuthorId a) throws DbException {
- identityLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -977,12 +757,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- identityLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Collection getLocalAuthors() throws DbException {
- identityLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -994,13 +774,13 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- identityLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Map getLocalProperties()
throws DbException {
- transportLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1013,13 +793,13 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- transportLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public TransportProperties getLocalProperties(TransportId t)
throws DbException {
- transportLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1033,12 +813,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- transportLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public byte[] getMessageBody(MessageId m) throws DbException {
- messageLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1052,38 +832,33 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- messageLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Collection getMessageHeaders(GroupId g)
throws DbException {
- messageLock.readLock().lock();
+ lock.readLock().lock();
try {
- subscriptionLock.readLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsGroup(txn, g))
- throw new NoSuchSubscriptionException();
- Collection headers =
- db.getMessageHeaders(txn, g);
- db.commitTransaction(txn);
- return headers;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.readLock().unlock();
+ if(!db.containsGroup(txn, g))
+ throw new NoSuchSubscriptionException();
+ Collection headers =
+ db.getMessageHeaders(txn, g);
+ db.commitTransaction(txn);
+ return headers;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- messageLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public boolean getReadFlag(MessageId m) throws DbException {
- messageLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1097,13 +872,13 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- messageLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Map getRemoteProperties(
TransportId t) throws DbException {
- transportLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1116,12 +891,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- transportLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Collection getSecrets() throws DbException {
- windowLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1133,12 +908,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- windowLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Settings getSettings() throws DbException {
- settingLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1150,12 +925,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- settingLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Collection getSubscribers(GroupId g) throws DbException {
- subscriptionLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1167,12 +942,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- subscriptionLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Map getTransportLatencies() throws DbException {
- transportLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1185,12 +960,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- transportLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Map getUnreadMessageCounts() throws DbException {
- messageLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1202,12 +977,12 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- messageLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Collection getVisibility(GroupId g) throws DbException {
- subscriptionLock.readLock().lock();
+ lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1221,46 +996,35 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- subscriptionLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public long incrementConnectionCounter(ContactId c, TransportId t,
long period) throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- transportLock.readLock().lock();
+ T txn = db.startTransaction();
try {
- windowLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- if(!db.containsTransport(txn, t))
- throw new NoSuchTransportException();
- long counter = db.incrementConnectionCounter(txn, c, t,
- period);
- db.commitTransaction(txn);
- return counter;
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
- }
- } finally {
- transportLock.readLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ if(!db.containsTransport(txn, t))
+ throw new NoSuchTransportException();
+ long counter = db.incrementConnectionCounter(txn, c, t, period);
+ db.commitTransaction(txn);
+ return counter;
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public void mergeConfig(TransportId t, TransportConfig c)
throws DbException {
- transportLock.writeLock().lock();
+ lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1273,14 +1037,14 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- transportLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
}
public void mergeLocalProperties(TransportId t, TransportProperties p)
throws DbException {
boolean changed = false;
- transportLock.writeLock().lock();
+ lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1296,14 +1060,14 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- transportLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
if(changed) callListeners(new LocalTransportsUpdatedEvent());
}
public void mergeSettings(Settings s) throws DbException {
boolean changed = false;
- settingLock.writeLock().lock();
+ lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1317,113 +1081,92 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- settingLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
if(changed) callListeners(new SettingsUpdatedEvent());
}
public void receiveAck(ContactId c, Ack a) throws DbException {
Collection acked = new ArrayList();
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- for(MessageId m : a.getMessageIds()) {
- if(db.containsVisibleMessage(txn, c, m)) {
- db.raiseSeenFlag(txn, c, m);
- acked.add(m);
- }
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ for(MessageId m : a.getMessageIds()) {
+ if(db.containsVisibleMessage(txn, c, m)) {
+ db.raiseSeenFlag(txn, c, m);
+ acked.add(m);
}
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
}
- } finally {
- messageLock.writeLock().unlock();
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
callListeners(new MessagesAckedEvent(c, acked));
}
public void receiveMessage(ContactId c, Message m) throws DbException {
boolean duplicate, visible;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- subscriptionLock.readLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- duplicate = db.containsMessage(txn, m.getId());
- GroupId g = m.getGroup().getId();
- visible = db.containsVisibleGroup(txn, c, g);
- if(!duplicate && visible) addMessage(txn, m, c);
- if(visible) db.raiseAckFlag(txn, c, m.getId());
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.readLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ duplicate = db.containsMessage(txn, m.getId());
+ visible = db.containsVisibleGroup(txn, c, m.getGroup().getId());
+ if(visible) {
+ if(!duplicate) addMessage(txn, m, c);
+ db.raiseAckFlag(txn, c, m.getId());
}
- } finally {
- messageLock.writeLock().unlock();
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
+ }
+ if(visible) {
+ if(!duplicate)
+ callListeners(new MessageAddedEvent(m.getGroup(), c));
+ callListeners(new MessageToAckEvent(c));
}
- if(visible) callListeners(new MessageToAckEvent(c));
- if(!duplicate) callListeners(new MessageAddedEvent(m.getGroup(), c));
}
public void receiveOffer(ContactId c, Offer o) throws DbException {
boolean ack = false, request = false;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- subscriptionLock.readLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- int count = db.countOfferedMessages(txn, c);
- for(MessageId m : o.getMessageIds()) {
- if(db.containsVisibleMessage(txn, c, m)) {
- db.raiseSeenFlag(txn, c, m);
- db.raiseAckFlag(txn, c, m);
- ack = true;
- } else if(count < MAX_OFFERED_MESSAGES) {
- db.addOfferedMessage(txn, c, m);
- request = true;
- count++;
- }
- }
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ int count = db.countOfferedMessages(txn, c);
+ for(MessageId m : o.getMessageIds()) {
+ if(db.containsVisibleMessage(txn, c, m)) {
+ db.raiseSeenFlag(txn, c, m);
+ db.raiseAckFlag(txn, c, m);
+ ack = true;
+ } else if(count < MAX_OFFERED_MESSAGES) {
+ db.addOfferedMessage(txn, c, m);
+ request = true;
+ count++;
}
- } finally {
- subscriptionLock.readLock().unlock();
}
- } finally {
- messageLock.writeLock().unlock();
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(ack) callListeners(new MessageToAckEvent(c));
if(request) callListeners(new MessageToRequestEvent(c));
@@ -1431,269 +1174,196 @@ DatabaseCleaner.Callback {
public void receiveRequest(ContactId c, Request r) throws DbException {
boolean requested = false;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- for(MessageId m : r.getMessageIds()) {
- if(db.containsVisibleMessage(txn, c, m)) {
- db.raiseRequestedFlag(txn, c, m);
- db.resetExpiryTime(txn, c, m);
- requested = true;
- }
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ for(MessageId m : r.getMessageIds()) {
+ if(db.containsVisibleMessage(txn, c, m)) {
+ db.raiseRequestedFlag(txn, c, m);
+ db.resetExpiryTime(txn, c, m);
+ requested = true;
}
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
}
- } finally {
- messageLock.writeLock().unlock();
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(requested) callListeners(new MessageRequestedEvent(c));
}
public void receiveRetentionAck(ContactId c, RetentionAck a)
throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- retentionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- db.setRetentionUpdateAcked(txn, c, a.getVersion());
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- retentionLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ db.setRetentionUpdateAcked(txn, c, a.getVersion());
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public void receiveRetentionUpdate(ContactId c, RetentionUpdate u)
throws DbException {
boolean updated;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- retentionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- updated = db.setRetentionTime(txn, c, u.getRetentionTime(),
- u.getVersion());
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- retentionLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ long retention = u.getRetentionTime(), version = u.getVersion();
+ updated = db.setRetentionTime(txn, c, retention, version);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(updated) callListeners(new RemoteRetentionTimeUpdatedEvent(c));
}
public void receiveSubscriptionAck(ContactId c, SubscriptionAck a)
throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- subscriptionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- db.setSubscriptionUpdateAcked(txn, c, a.getVersion());
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ db.setSubscriptionUpdateAcked(txn, c, a.getVersion());
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate u)
throws DbException {
boolean updated;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- subscriptionLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- Collection groups = u.getGroups();
- long version = u.getVersion();
- updated = db.setGroups(txn, c, groups, version);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.writeLock().unlock();
- }
- } finally {
- messageLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ updated = db.setGroups(txn, c, u.getGroups(), u.getVersion());
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(updated) callListeners(new RemoteSubscriptionsUpdatedEvent(c));
}
public void receiveTransportAck(ContactId c, TransportAck a)
throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- transportLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- TransportId t = a.getId();
- if(!db.containsTransport(txn, t))
- throw new NoSuchTransportException();
- db.setTransportUpdateAcked(txn, c, t, a.getVersion());
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- transportLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ if(!db.containsTransport(txn, a.getId()))
+ throw new NoSuchTransportException();
+ db.setTransportUpdateAcked(txn, c, a.getId(), a.getVersion());
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public void receiveTransportUpdate(ContactId c, TransportUpdate u)
throws DbException {
boolean updated;
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- transportLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- TransportId t = u.getId();
- TransportProperties p = u.getProperties();
- long version = u.getVersion();
- updated = db.setRemoteProperties(txn, c, t, p, version);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- transportLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ TransportId t = u.getId();
+ TransportProperties p = u.getProperties();
+ long version = u.getVersion();
+ updated = db.setRemoteProperties(txn, c, t, p, version);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(updated)
callListeners(new RemoteTransportsUpdatedEvent(c, u.getId()));
}
public void removeContact(ContactId c) throws DbException {
- contactLock.writeLock().lock();
+ lock.writeLock().lock();
try {
- messageLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- retentionLock.writeLock().lock();
- try {
- subscriptionLock.writeLock().lock();
- try {
- transportLock.writeLock().lock();
- try {
- windowLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- GroupId g = db.getInboxGroupId(txn, c);
- if(g != null) db.removeGroup(txn, g);
- db.removeContact(txn, c);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
- }
- } finally {
- transportLock.writeLock().unlock();
- }
- } finally {
- subscriptionLock.writeLock().unlock();
- }
- } finally {
- retentionLock.writeLock().unlock();
- }
- } finally {
- messageLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ GroupId g = db.getInboxGroupId(txn, c);
+ if(g != null) db.removeGroup(txn, g);
+ db.removeContact(txn, c);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
callListeners(new ContactRemovedEvent(c));
}
public void removeGroup(Group g) throws DbException {
Collection affected;
- messageLock.writeLock().lock();
+ lock.writeLock().lock();
try {
- subscriptionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- GroupId id = g.getId();
- if(!db.containsGroup(txn, id))
- throw new NoSuchSubscriptionException();
- affected = db.getVisibility(txn, id);
- db.removeGroup(txn, id);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.writeLock().unlock();
+ GroupId id = g.getId();
+ if(!db.containsGroup(txn, id))
+ throw new NoSuchSubscriptionException();
+ affected = db.getVisibility(txn, id);
+ db.removeGroup(txn, id);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- messageLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
callListeners(new SubscriptionRemovedEvent(g));
callListeners(new LocalSubscriptionsUpdatedEvent(affected));
@@ -1701,141 +1371,90 @@ DatabaseCleaner.Callback {
public void removeLocalAuthor(AuthorId a) throws DbException {
Collection affected;
- contactLock.writeLock().lock();
+ lock.writeLock().lock();
try {
- identityLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- messageLock.writeLock().lock();
- try {
- retentionLock.writeLock().lock();
- try {
- subscriptionLock.writeLock().lock();
- try {
- transportLock.writeLock().lock();
- try {
- windowLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsLocalAuthor(txn, a))
- throw new NoSuchLocalAuthorException();
- affected = db.getContacts(txn, a);
- for(ContactId c : affected) {
- GroupId g = db.getInboxGroupId(txn, c);
- if(g != null) db.removeGroup(txn, g);
- }
- db.removeLocalAuthor(txn, a);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
- }
- } finally {
- transportLock.writeLock().unlock();
- }
- } finally {
- subscriptionLock.writeLock().unlock();
- }
- } finally {
- retentionLock.writeLock().unlock();
- }
- } finally {
- messageLock.writeLock().unlock();
+ if(!db.containsLocalAuthor(txn, a))
+ throw new NoSuchLocalAuthorException();
+ affected = db.getContacts(txn, a);
+ for(ContactId c : affected) {
+ GroupId g = db.getInboxGroupId(txn, c);
+ if(g != null) db.removeGroup(txn, g);
}
- } finally {
- identityLock.writeLock().unlock();
+ db.removeLocalAuthor(txn, a);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
for(ContactId c : affected) callListeners(new ContactRemovedEvent(c));
callListeners(new LocalAuthorRemovedEvent(a));
}
public void removeTransport(TransportId t) throws DbException {
- transportLock.writeLock().lock();
+ lock.writeLock().lock();
try {
- windowLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsTransport(txn, t))
- throw new NoSuchTransportException();
- db.removeTransport(txn, t);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
+ if(!db.containsTransport(txn, t))
+ throw new NoSuchTransportException();
+ db.removeTransport(txn, t);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- transportLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
callListeners(new TransportRemovedEvent(t));
}
public void setConnectionWindow(ContactId c, TransportId t, long period,
long centre, byte[] bitmap) throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- transportLock.readLock().lock();
+ T txn = db.startTransaction();
try {
- windowLock.writeLock().lock();
- try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- if(!db.containsTransport(txn, t))
- throw new NoSuchTransportException();
- db.setConnectionWindow(txn, c, t, period, centre,
- bitmap);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- windowLock.writeLock().unlock();
- }
- } finally {
- transportLock.readLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ if(!db.containsTransport(txn, t))
+ throw new NoSuchTransportException();
+ db.setConnectionWindow(txn, c, t, period, centre, bitmap);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public void setInboxGroup(ContactId c, Group g) throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- subscriptionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- db.setInboxGroup(txn, c, g);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- subscriptionLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ db.setInboxGroup(txn, c, g);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public void setReadFlag(MessageId m, boolean read) throws DbException {
- messageLock.writeLock().lock();
+ lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
@@ -1848,73 +1467,63 @@ DatabaseCleaner.Callback {
throw e;
}
} finally {
- messageLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
}
public void setRemoteProperties(ContactId c,
Map p) throws DbException {
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- transportLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsContact(txn, c))
- throw new NoSuchContactException();
- db.setRemoteProperties(txn, c, p);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
- }
- } finally {
- transportLock.writeLock().unlock();
+ if(!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ db.setRemoteProperties(txn, c, p);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
public void setVisibility(GroupId g, Collection visible)
throws DbException {
Collection affected = new ArrayList();
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- subscriptionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsGroup(txn, g))
- throw new NoSuchSubscriptionException();
- // Use HashSets for O(1) lookups, O(n) overall running time
- HashSet now = new HashSet(visible);
- Collection before = db.getVisibility(txn, g);
- before = new HashSet(before);
- // Set the group's visibility for each current contact
- for(ContactId c : db.getContactIds(txn)) {
- boolean wasBefore = before.contains(c);
- boolean isNow = now.contains(c);
- if(!wasBefore && isNow) {
- db.addVisibility(txn, c, g);
- affected.add(c);
- } else if(wasBefore && !isNow) {
- db.removeVisibility(txn, c, g);
- affected.add(c);
- }
+ if(!db.containsGroup(txn, g))
+ throw new NoSuchSubscriptionException();
+ // Use HashSets for O(1) lookups, O(n) overall running time
+ HashSet now = new HashSet(visible);
+ Collection before = db.getVisibility(txn, g);
+ before = new HashSet(before);
+ // Set the group's visibility for each current contact
+ for(ContactId c : db.getContactIds(txn)) {
+ boolean wasBefore = before.contains(c);
+ boolean isNow = now.contains(c);
+ if(!wasBefore && isNow) {
+ db.addVisibility(txn, c, g);
+ affected.add(c);
+ } else if(wasBefore && !isNow) {
+ db.removeVisibility(txn, c, g);
+ affected.add(c);
}
- // Make the group invisible to future contacts
- db.setVisibleToAll(txn, g, false);
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
}
- } finally {
- subscriptionLock.writeLock().unlock();
+ // Make the group invisible to future contacts
+ db.setVisibleToAll(txn, g, false);
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(!affected.isEmpty())
callListeners(new LocalSubscriptionsUpdatedEvent(affected));
@@ -1922,37 +1531,32 @@ DatabaseCleaner.Callback {
public void setVisibleToAll(GroupId g, boolean all) throws DbException {
Collection affected = new ArrayList();
- contactLock.readLock().lock();
+ lock.writeLock().lock();
try {
- subscriptionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- if(!db.containsGroup(txn, g))
- throw new NoSuchSubscriptionException();
- // Make the group visible or invisible to future contacts
- db.setVisibleToAll(txn, g, all);
- if(all) {
- // Make the group visible to all current contacts
- Collection before = db.getVisibility(txn, g);
- before = new HashSet(before);
- for(ContactId c : db.getContactIds(txn)) {
- if(!before.contains(c)) {
- db.addVisibility(txn, c, g);
- affected.add(c);
- }
+ if(!db.containsGroup(txn, g))
+ throw new NoSuchSubscriptionException();
+ // Make the group visible or invisible to future contacts
+ db.setVisibleToAll(txn, g, all);
+ if(all) {
+ // Make the group visible to all current contacts
+ Collection before = db.getVisibility(txn, g);
+ before = new HashSet(before);
+ for(ContactId c : db.getContactIds(txn)) {
+ if(!before.contains(c)) {
+ db.addVisibility(txn, c, g);
+ affected.add(c);
}
}
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
}
- } finally {
- subscriptionLock.writeLock().unlock();
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- contactLock.readLock().unlock();
+ lock.writeLock().unlock();
}
if(!affected.isEmpty())
callListeners(new LocalSubscriptionsUpdatedEvent(affected));
@@ -1979,29 +1583,24 @@ DatabaseCleaner.Callback {
*/
private boolean expireMessages(int size) throws DbException {
Collection expired;
- messageLock.writeLock().lock();
+ lock.writeLock().lock();
try {
- retentionLock.writeLock().lock();
+ T txn = db.startTransaction();
try {
- T txn = db.startTransaction();
- try {
- expired = db.getOldMessages(txn, size);
- if(!expired.isEmpty()) {
- for(MessageId m : expired) db.removeMessage(txn, m);
- db.incrementRetentionVersions(txn);
- if(LOG.isLoggable(INFO))
- LOG.info("Expired " + expired.size() + " messages");
- }
- db.commitTransaction(txn);
- } catch(DbException e) {
- db.abortTransaction(txn);
- throw e;
+ expired = db.getOldMessages(txn, size);
+ if(!expired.isEmpty()) {
+ for(MessageId m : expired) db.removeMessage(txn, m);
+ db.incrementRetentionVersions(txn);
+ if(LOG.isLoggable(INFO))
+ LOG.info("Expired " + expired.size() + " messages");
}
- } finally {
- retentionLock.writeLock().unlock();
+ db.commitTransaction(txn);
+ } catch(DbException e) {
+ db.abortTransaction(txn);
+ throw e;
}
} finally {
- messageLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
if(expired.isEmpty()) return false;
callListeners(new MessageExpiredEvent());
diff --git a/briar-core/src/org/briarproject/db/DatabaseModule.java b/briar-core/src/org/briarproject/db/DatabaseModule.java
index 10edb618a..11719acb6 100644
--- a/briar-core/src/org/briarproject/db/DatabaseModule.java
+++ b/briar-core/src/org/briarproject/db/DatabaseModule.java
@@ -25,9 +25,6 @@ import com.google.inject.Provides;
public class DatabaseModule extends AbstractModule {
- /** The maximum number of executor threads. */
- private static final int MAX_EXECUTOR_THREADS = 10;
-
private final ExecutorService databaseExecutor;
public DatabaseModule() {
@@ -36,9 +33,9 @@ public class DatabaseModule extends AbstractModule {
// Discard tasks that are submitted during shutdown
RejectedExecutionHandler policy =
new ThreadPoolExecutor.DiscardPolicy();
- // Create a limited # of threads and keep them in the pool for 60 secs
- databaseExecutor = new ThreadPoolExecutor(0, MAX_EXECUTOR_THREADS,
- 60, SECONDS, queue, policy);
+ // Use a single thread and keep it in the pool for 60 secs
+ databaseExecutor = new ThreadPoolExecutor(0, 1, 60, SECONDS, queue,
+ policy);
}
protected void configure() {
diff --git a/briar-core/src/org/briarproject/db/JdbcDatabase.java b/briar-core/src/org/briarproject/db/JdbcDatabase.java
index 42214788e..5d51ad104 100644
--- a/briar-core/src/org/briarproject/db/JdbcDatabase.java
+++ b/briar-core/src/org/briarproject/db/JdbcDatabase.java
@@ -71,8 +71,6 @@ abstract class JdbcDatabase implements Database {
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (key))";
- // Locking: identity
- // Dependents: contact, message, retention, subscription, transport, window
private static final String CREATE_LOCAL_AUTHORS =
"CREATE TABLE localAuthors"
+ " (authorId HASH NOT NULL,"
@@ -82,8 +80,6 @@ abstract class JdbcDatabase implements Database {
+ " created BIGINT NOT NULL,"
+ " PRIMARY KEY (authorId))";
- // Locking: contact
- // Dependents: message, retention, subscription, transport, window
private static final String CREATE_CONTACTS =
"CREATE TABLE contacts"
+ " (contactId COUNTER,"
@@ -97,8 +93,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES localAuthors (authorId)"
+ " ON DELETE CASCADE)";
- // Locking: subscription
- // Dependents: message
private static final String CREATE_GROUPS =
"CREATE TABLE groups"
+ " (groupId HASH NOT NULL,"
@@ -107,7 +101,6 @@ abstract class JdbcDatabase implements Database {
+ " visibleToAll BOOLEAN NOT NULL,"
+ " PRIMARY KEY (groupId))";
- // Locking: subscription
private static final String CREATE_GROUP_VISIBILITIES =
"CREATE TABLE groupVisibilities"
+ " (contactId INT NOT NULL,"
@@ -121,7 +114,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES groups (groupId)"
+ " ON DELETE CASCADE)";
- // Locking: subscription
private static final String CREATE_CONTACT_GROUPS =
"CREATE TABLE contactGroups"
+ " (contactId INT NOT NULL,"
@@ -133,7 +125,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
- // Locking: subscription
private static final String CREATE_GROUP_VERSIONS =
"CREATE TABLE groupVersions"
+ " (contactId INT NOT NULL,"
@@ -148,7 +139,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
- // Locking: message
private static final String CREATE_MESSAGES =
"CREATE TABLE messages"
+ " (messageId HASH NOT NULL,"
@@ -182,7 +172,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
- // Locking: message
private static final String CREATE_STATUSES =
"CREATE TABLE statuses"
+ " (messageId HASH NOT NULL,"
@@ -206,7 +195,6 @@ abstract class JdbcDatabase implements Database {
private static final String INDEX_STATUSES_BY_CONTACT =
"CREATE INDEX statusesByContact ON statuses (contactId)";
- // Locking: retention
private static final String CREATE_RETENTION_VERSIONS =
"CREATE TABLE retentionVersions"
+ " (contactId INT NOT NULL,"
@@ -222,15 +210,12 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
- // Locking: transport
- // Dependents: window
private static final String CREATE_TRANSPORTS =
"CREATE TABLE transports"
+ " (transportId VARCHAR NOT NULL,"
+ " maxLatency BIGINT NOT NULL,"
+ " PRIMARY KEY (transportId))";
- // Locking: transport
private static final String CREATE_TRANSPORT_CONFIGS =
"CREATE TABLE transportConfigs"
+ " (transportId VARCHAR NOT NULL,"
@@ -241,7 +226,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
- // Locking: transport
private static final String CREATE_TRANSPORT_PROPS =
"CREATE TABLE transportProperties"
+ " (transportId VARCHAR NOT NULL,"
@@ -252,7 +236,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
- // Locking: transport
private static final String CREATE_TRANSPORT_VERSIONS =
"CREATE TABLE transportVersions"
+ " (contactId INT NOT NULL,"
@@ -269,7 +252,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
- // Locking: transport
private static final String CREATE_CONTACT_TRANSPORT_PROPS =
"CREATE TABLE contactTransportProperties"
+ " (contactId INT NOT NULL,"
@@ -281,7 +263,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
- // Locking: transport
private static final String CREATE_CONTACT_TRANSPORT_VERSIONS =
"CREATE TABLE contactTransportVersions"
+ " (contactId INT NOT NULL,"
@@ -293,7 +274,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
- // Locking: window
private static final String CREATE_ENDPOINTS =
"CREATE TABLE endpoints"
+ " (contactId INT NOT NULL,"
@@ -308,7 +288,6 @@ abstract class JdbcDatabase implements Database {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
- // Locking: window
private static final String CREATE_SECRETS =
"CREATE TABLE secrets"
+ " (contactId INT NOT NULL,"
@@ -1098,8 +1077,8 @@ abstract class JdbcDatabase implements Database {
}
}
- public boolean containsVisibleGroup(Connection txn, ContactId c,
- GroupId g) throws DbException {
+ public boolean containsVisibleGroup(Connection txn, ContactId c, GroupId g)
+ throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
diff --git a/briar-core/src/org/briarproject/lifecycle/LifecycleModule.java b/briar-core/src/org/briarproject/lifecycle/LifecycleModule.java
index 8c6c7360f..0714a0555 100644
--- a/briar-core/src/org/briarproject/lifecycle/LifecycleModule.java
+++ b/briar-core/src/org/briarproject/lifecycle/LifecycleModule.java
@@ -1,18 +1,49 @@
package org.briarproject.lifecycle;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+
import javax.inject.Singleton;
+import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.lifecycle.ShutdownManager;
import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
public class LifecycleModule extends AbstractModule {
+ private final ExecutorService ioExecutor;
+
+ public LifecycleModule() {
+ // The thread pool is unbounded, so use direct handoff
+ BlockingQueue queue = new SynchronousQueue();
+ // Discard tasks that are submitted during shutdown
+ RejectedExecutionHandler policy =
+ new ThreadPoolExecutor.DiscardPolicy();
+ // Create threads as required and keep them in the pool for 60 seconds
+ ioExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60, SECONDS, queue, policy);
+ }
+
+ @Override
protected void configure() {
bind(LifecycleManager.class).to(
LifecycleManagerImpl.class).in(Singleton.class);
bind(ShutdownManager.class).to(
ShutdownManagerImpl.class).in(Singleton.class);
}
+
+ @Provides @Singleton @IoExecutor
+ Executor getIoExecutor(LifecycleManager lifecycleManager) {
+ lifecycleManager.registerForShutdown(ioExecutor);
+ return ioExecutor;
+ }
}
diff --git a/briar-core/src/org/briarproject/messaging/duplex/DuplexConnection.java b/briar-core/src/org/briarproject/messaging/duplex/DuplexConnection.java
index 56ec0aa93..de6d569a5 100644
--- a/briar-core/src/org/briarproject/messaging/duplex/DuplexConnection.java
+++ b/briar-core/src/org/briarproject/messaging/duplex/DuplexConnection.java
@@ -276,7 +276,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveAck implements Runnable {
private final Ack ack;
@@ -315,7 +315,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveMessage implements Runnable {
private final Message message;
@@ -334,7 +334,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveOffer implements Runnable {
private final Offer offer;
@@ -353,7 +353,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveRequest implements Runnable {
private final Request request;
@@ -372,7 +372,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveRetentionAck implements Runnable {
private final RetentionAck ack;
@@ -391,7 +391,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveRetentionUpdate implements Runnable {
private final RetentionUpdate update;
@@ -410,7 +410,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
@@ -429,7 +429,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveSubscriptionUpdate implements Runnable {
private final SubscriptionUpdate update;
@@ -448,7 +448,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveTransportAck implements Runnable {
private final TransportAck ack;
@@ -467,7 +467,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class ReceiveTransportUpdate implements Runnable {
private final TransportUpdate update;
@@ -486,7 +486,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateAck implements Runnable {
public void run() {
@@ -525,7 +525,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateBatch implements Runnable {
public void run() {
@@ -564,7 +564,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateOffer implements Runnable {
public void run() {
@@ -603,7 +603,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateRequest implements Runnable {
public void run() {
@@ -642,7 +642,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateRetentionAck implements Runnable {
public void run() {
@@ -679,7 +679,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateRetentionUpdate implements Runnable {
public void run() {
@@ -717,7 +717,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateSubscriptionAck implements Runnable {
public void run() {
@@ -754,7 +754,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
@@ -792,7 +792,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
@@ -830,7 +830,7 @@ abstract class DuplexConnection implements EventListener {
}
}
- // This task runs on a database thread
+ // This task runs on the database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {
diff --git a/briar-core/src/org/briarproject/plugins/PluginManagerImpl.java b/briar-core/src/org/briarproject/plugins/PluginManagerImpl.java
index 3cdfbe9a9..722897476 100644
--- a/briar-core/src/org/briarproject/plugins/PluginManagerImpl.java
+++ b/briar-core/src/org/briarproject/plugins/PluginManagerImpl.java
@@ -23,9 +23,9 @@ import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
+import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginCallback;
-import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
@@ -49,7 +49,7 @@ class PluginManagerImpl implements PluginManager {
private static final Logger LOG =
Logger.getLogger(PluginManagerImpl.class.getName());
- private final Executor pluginExecutor;
+ private final Executor ioExecutor;
private final SimplexPluginConfig simplexPluginConfig;
private final DuplexPluginConfig duplexPluginConfig;
private final Clock clock;
@@ -62,12 +62,12 @@ class PluginManagerImpl implements PluginManager {
private final List duplexPlugins;
@Inject
- PluginManagerImpl(@PluginExecutor Executor pluginExecutor,
+ PluginManagerImpl(@IoExecutor Executor ioExecutor,
SimplexPluginConfig simplexPluginConfig,
DuplexPluginConfig duplexPluginConfig, Clock clock,
DatabaseComponent db, Poller poller,
ConnectionDispatcher dispatcher, UiCallback uiCallback) {
- this.pluginExecutor = pluginExecutor;
+ this.ioExecutor = ioExecutor;
this.simplexPluginConfig = simplexPluginConfig;
this.duplexPluginConfig = duplexPluginConfig;
this.clock = clock;
@@ -87,14 +87,14 @@ class PluginManagerImpl implements PluginManager {
simplexPluginConfig.getFactories();
final CountDownLatch sLatch = new CountDownLatch(sFactories.size());
for(SimplexPluginFactory factory : sFactories)
- pluginExecutor.execute(new SimplexPluginStarter(factory, sLatch));
+ ioExecutor.execute(new SimplexPluginStarter(factory, sLatch));
// Instantiate and start the duplex plugins
LOG.info("Starting duplex plugins");
Collection dFactories =
duplexPluginConfig.getFactories();
final CountDownLatch dLatch = new CountDownLatch(dFactories.size());
for(DuplexPluginFactory factory : dFactories)
- pluginExecutor.execute(new DuplexPluginStarter(factory, dLatch));
+ ioExecutor.execute(new DuplexPluginStarter(factory, dLatch));
// Wait for the plugins to start
try {
sLatch.await();
@@ -119,11 +119,11 @@ class PluginManagerImpl implements PluginManager {
// Stop the simplex plugins
LOG.info("Stopping simplex plugins");
for(SimplexPlugin plugin : simplexPlugins)
- pluginExecutor.execute(new PluginStopper(plugin, latch));
+ ioExecutor.execute(new PluginStopper(plugin, latch));
// Stop the duplex plugins
LOG.info("Stopping duplex plugins");
for(DuplexPlugin plugin : duplexPlugins)
- pluginExecutor.execute(new PluginStopper(plugin, latch));
+ ioExecutor.execute(new PluginStopper(plugin, latch));
plugins.clear();
simplexPlugins.clear();
duplexPlugins.clear();
diff --git a/briar-core/src/org/briarproject/plugins/PluginsModule.java b/briar-core/src/org/briarproject/plugins/PluginsModule.java
index f905d2c32..aca1ef52c 100644
--- a/briar-core/src/org/briarproject/plugins/PluginsModule.java
+++ b/briar-core/src/org/briarproject/plugins/PluginsModule.java
@@ -1,18 +1,8 @@
package org.briarproject.plugins;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-
import javax.inject.Singleton;
import org.briarproject.api.lifecycle.LifecycleManager;
-import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.plugins.PluginManager;
import com.google.inject.AbstractModule;
@@ -20,19 +10,7 @@ import com.google.inject.Provides;
public class PluginsModule extends AbstractModule {
- private final ExecutorService pluginExecutor;
-
- public PluginsModule() {
- // The thread pool is unbounded, so use direct handoff
- BlockingQueue queue = new SynchronousQueue();
- // Discard tasks that are submitted during shutdown
- RejectedExecutionHandler policy =
- new ThreadPoolExecutor.DiscardPolicy();
- // Create threads as required and keep them in the pool for 60 seconds
- pluginExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60, SECONDS, queue, policy);
- }
-
+ @Override
protected void configure() {
bind(Poller.class).to(PollerImpl.class);
}
@@ -43,10 +21,4 @@ public class PluginsModule extends AbstractModule {
lifecycleManager.register(pluginManager);
return pluginManager;
}
-
- @Provides @Singleton @PluginExecutor
- Executor getPluginExecutor(LifecycleManager lifecycleManager) {
- lifecycleManager.registerForShutdown(pluginExecutor);
- return pluginExecutor;
- }
}
diff --git a/briar-core/src/org/briarproject/plugins/PollerImpl.java b/briar-core/src/org/briarproject/plugins/PollerImpl.java
index 6d331877b..a92cdf4cd 100644
--- a/briar-core/src/org/briarproject/plugins/PollerImpl.java
+++ b/briar-core/src/org/briarproject/plugins/PollerImpl.java
@@ -9,8 +9,8 @@ import java.util.logging.Logger;
import javax.inject.Inject;
+import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.Plugin;
-import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.ConnectionRegistry;
@@ -19,14 +19,14 @@ class PollerImpl implements Poller {
private static final Logger LOG =
Logger.getLogger(PollerImpl.class.getName());
- private final Executor pluginExecutor;
+ private final Executor ioExecutor;
private final ConnectionRegistry connRegistry;
private final Timer timer;
@Inject
- PollerImpl(@PluginExecutor Executor pluginExecutor,
- ConnectionRegistry connRegistry, Timer timer) {
- this.pluginExecutor = pluginExecutor;
+ PollerImpl(@IoExecutor Executor ioExecutor, ConnectionRegistry connRegistry,
+ Timer timer) {
+ this.ioExecutor = ioExecutor;
this.connRegistry = connRegistry;
this.timer = timer;
}
@@ -49,7 +49,7 @@ class PollerImpl implements Poller {
}
public void pollNow(final Plugin p) {
- pluginExecutor.execute(new Runnable() {
+ ioExecutor.execute(new Runnable() {
public void run() {
if(LOG.isLoggable(INFO))
LOG.info("Polling " + p.getClass().getSimpleName());
@@ -66,6 +66,7 @@ class PollerImpl implements Poller {
this.plugin = plugin;
}
+ @Override
public void run() {
pollNow(plugin);
schedule(plugin, false);
diff --git a/briar-core/src/org/briarproject/plugins/file/FilePlugin.java b/briar-core/src/org/briarproject/plugins/file/FilePlugin.java
index 3e5854524..60628f33e 100644
--- a/briar-core/src/org/briarproject/plugins/file/FilePlugin.java
+++ b/briar-core/src/org/briarproject/plugins/file/FilePlugin.java
@@ -25,7 +25,7 @@ public abstract class FilePlugin implements SimplexPlugin {
private static final Logger LOG =
Logger.getLogger(FilePlugin.class.getName());
- protected final Executor pluginExecutor;
+ protected final Executor ioExecutor;
protected final FileUtils fileUtils;
protected final SimplexPluginCallback callback;
protected final int maxFrameLength;
@@ -38,10 +38,10 @@ public abstract class FilePlugin implements SimplexPlugin {
protected abstract void writerFinished(File f);
protected abstract void readerFinished(File f);
- protected FilePlugin(Executor pluginExecutor, FileUtils fileUtils,
+ protected FilePlugin(Executor ioExecutor, FileUtils fileUtils,
SimplexPluginCallback callback, int maxFrameLength,
long maxLatency) {
- this.pluginExecutor = pluginExecutor;
+ this.ioExecutor = ioExecutor;
this.fileUtils = fileUtils;
this.callback = callback;
this.maxFrameLength = maxFrameLength;
@@ -100,7 +100,7 @@ public abstract class FilePlugin implements SimplexPlugin {
protected void createReaderFromFile(final File f) {
if(!running) return;
- pluginExecutor.execute(new ReaderCreator(f));
+ ioExecutor.execute(new ReaderCreator(f));
}
private class ReaderCreator implements Runnable {
diff --git a/briar-core/src/org/briarproject/plugins/tcp/LanTcpPlugin.java b/briar-core/src/org/briarproject/plugins/tcp/LanTcpPlugin.java
index d4d3c5062..39e4b9418 100644
--- a/briar-core/src/org/briarproject/plugins/tcp/LanTcpPlugin.java
+++ b/briar-core/src/org/briarproject/plugins/tcp/LanTcpPlugin.java
@@ -16,9 +16,9 @@ class LanTcpPlugin extends TcpPlugin {
static final TransportId ID = new TransportId("lan");
- LanTcpPlugin(Executor pluginExecutor, DuplexPluginCallback callback,
+ LanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) {
- super(pluginExecutor, callback, maxFrameLength, maxLatency,
+ super(ioExecutor, callback, maxFrameLength, maxLatency,
pollingInterval);
}
diff --git a/briar-core/src/org/briarproject/plugins/tcp/LanTcpPluginFactory.java b/briar-core/src/org/briarproject/plugins/tcp/LanTcpPluginFactory.java
index eb24e4550..02b53eab9 100644
--- a/briar-core/src/org/briarproject/plugins/tcp/LanTcpPluginFactory.java
+++ b/briar-core/src/org/briarproject/plugins/tcp/LanTcpPluginFactory.java
@@ -13,10 +13,10 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute
- private final Executor pluginExecutor;
+ private final Executor ioExecutor;
- public LanTcpPluginFactory(Executor pluginExecutor) {
- this.pluginExecutor = pluginExecutor;
+ public LanTcpPluginFactory(Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
}
public TransportId getId() {
@@ -24,7 +24,7 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
- return new LanTcpPlugin(pluginExecutor, callback, MAX_FRAME_LENGTH,
+ return new LanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, POLLING_INTERVAL);
}
}
diff --git a/briar-core/src/org/briarproject/plugins/tcp/TcpPlugin.java b/briar-core/src/org/briarproject/plugins/tcp/TcpPlugin.java
index c5de9d6c8..a8d3e8dde 100644
--- a/briar-core/src/org/briarproject/plugins/tcp/TcpPlugin.java
+++ b/briar-core/src/org/briarproject/plugins/tcp/TcpPlugin.java
@@ -35,7 +35,7 @@ abstract class TcpPlugin implements DuplexPlugin {
private static final Logger LOG =
Logger.getLogger(TcpPlugin.class.getName());
- protected final Executor pluginExecutor;
+ protected final Executor ioExecutor;
protected final DuplexPluginCallback callback;
protected final int maxFrameLength;
protected final long maxLatency, pollingInterval;
@@ -52,9 +52,9 @@ abstract class TcpPlugin implements DuplexPlugin {
/** Returns true if connections to the given address can be attempted. */
protected abstract boolean isConnectable(InetSocketAddress remote);
- protected TcpPlugin(Executor pluginExecutor, DuplexPluginCallback callback,
+ protected TcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) {
- this.pluginExecutor = pluginExecutor;
+ this.ioExecutor = ioExecutor;
this.callback = callback;
this.maxFrameLength = maxFrameLength;
this.maxLatency = maxLatency;
@@ -76,7 +76,7 @@ abstract class TcpPlugin implements DuplexPlugin {
}
protected void bind() {
- pluginExecutor.execute(new Runnable() {
+ ioExecutor.execute(new Runnable() {
public void run() {
if(!running) return;
ServerSocket ss = null;
@@ -172,7 +172,7 @@ abstract class TcpPlugin implements DuplexPlugin {
}
private void connectAndCallBack(final ContactId c) {
- pluginExecutor.execute(new Runnable() {
+ ioExecutor.execute(new Runnable() {
public void run() {
DuplexTransportConnection d = createConnection(c);
if(d != null) callback.outgoingConnectionCreated(c, d);
diff --git a/briar-core/src/org/briarproject/plugins/tcp/WanTcpPlugin.java b/briar-core/src/org/briarproject/plugins/tcp/WanTcpPlugin.java
index 7a403b636..6e2e3e754 100644
--- a/briar-core/src/org/briarproject/plugins/tcp/WanTcpPlugin.java
+++ b/briar-core/src/org/briarproject/plugins/tcp/WanTcpPlugin.java
@@ -20,10 +20,10 @@ class WanTcpPlugin extends TcpPlugin {
private volatile MappingResult mappingResult;
- WanTcpPlugin(Executor pluginExecutor, DuplexPluginCallback callback,
+ WanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval,
PortMapper portMapper) {
- super(pluginExecutor, callback, maxFrameLength, maxLatency,
+ super(ioExecutor, callback, maxFrameLength, maxLatency,
pollingInterval);
this.portMapper = portMapper;
}
diff --git a/briar-core/src/org/briarproject/plugins/tcp/WanTcpPluginFactory.java b/briar-core/src/org/briarproject/plugins/tcp/WanTcpPluginFactory.java
index a6de80173..f478bbc29 100644
--- a/briar-core/src/org/briarproject/plugins/tcp/WanTcpPluginFactory.java
+++ b/briar-core/src/org/briarproject/plugins/tcp/WanTcpPluginFactory.java
@@ -14,12 +14,12 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long POLLING_INTERVAL = 5 * 60 * 1000; // 5 minutes
- private final Executor pluginExecutor;
+ private final Executor ioExecutor;
private final ShutdownManager shutdownManager;
- public WanTcpPluginFactory(Executor pluginExecutor,
+ public WanTcpPluginFactory(Executor ioExecutor,
ShutdownManager shutdownManager) {
- this.pluginExecutor = pluginExecutor;
+ this.ioExecutor = ioExecutor;
this.shutdownManager = shutdownManager;
}
@@ -28,7 +28,7 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
}
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
- return new WanTcpPlugin(pluginExecutor, callback, MAX_FRAME_LENGTH,
+ return new WanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, POLLING_INTERVAL,
new PortMapperImpl(shutdownManager));
}
diff --git a/briar-core/src/org/briarproject/reliability/ReliabilityLayerFactoryImpl.java b/briar-core/src/org/briarproject/reliability/ReliabilityLayerFactoryImpl.java
index 0cb3716cb..7aee1db75 100644
--- a/briar-core/src/org/briarproject/reliability/ReliabilityLayerFactoryImpl.java
+++ b/briar-core/src/org/briarproject/reliability/ReliabilityLayerFactoryImpl.java
@@ -4,7 +4,7 @@ import java.util.concurrent.Executor;
import javax.inject.Inject;
-import org.briarproject.api.reliability.ReliabilityExecutor;
+import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.reliability.ReliabilityLayer;
import org.briarproject.api.reliability.ReliabilityLayerFactory;
import org.briarproject.api.reliability.WriteHandler;
@@ -13,16 +13,16 @@ import org.briarproject.system.SystemClock;
class ReliabilityLayerFactoryImpl implements ReliabilityLayerFactory {
- private final Executor executor;
+ private final Executor ioExecutor;
private final Clock clock;
@Inject
- ReliabilityLayerFactoryImpl(@ReliabilityExecutor Executor executor) {
- this.executor = executor;
+ ReliabilityLayerFactoryImpl(@IoExecutor Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
clock = new SystemClock();
}
public ReliabilityLayer createReliabilityLayer(WriteHandler writeHandler) {
- return new ReliabilityLayerImpl(executor, clock, writeHandler);
+ return new ReliabilityLayerImpl(ioExecutor, clock, writeHandler);
}
}
diff --git a/briar-core/src/org/briarproject/reliability/ReliabilityModule.java b/briar-core/src/org/briarproject/reliability/ReliabilityModule.java
index 691b52cb7..aabd18e87 100644
--- a/briar-core/src/org/briarproject/reliability/ReliabilityModule.java
+++ b/briar-core/src/org/briarproject/reliability/ReliabilityModule.java
@@ -1,46 +1,14 @@
package org.briarproject.reliability;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import javax.inject.Singleton;
-
-import org.briarproject.api.lifecycle.LifecycleManager;
-import org.briarproject.api.reliability.ReliabilityExecutor;
import org.briarproject.api.reliability.ReliabilityLayerFactory;
import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
public class ReliabilityModule extends AbstractModule {
- private final ExecutorService reliabilityExecutor;
-
- public ReliabilityModule() {
- // The thread pool is unbounded, so use direct handoff
- BlockingQueue queue = new SynchronousQueue();
- // Discard tasks that are submitted during shutdown
- RejectedExecutionHandler policy =
- new ThreadPoolExecutor.DiscardPolicy();
- // Create threads as required and keep them in the pool for 60 seconds
- reliabilityExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60, SECONDS, queue, policy);
- }
-
+ @Override
protected void configure() {
bind(ReliabilityLayerFactory.class).to(
ReliabilityLayerFactoryImpl.class);
}
-
- @Provides @Singleton @ReliabilityExecutor
- Executor getReliabilityExecutor(LifecycleManager lifecycleManager) {
- lifecycleManager.registerForShutdown(reliabilityExecutor);
- return reliabilityExecutor;
- }
}
diff --git a/briar-core/src/org/briarproject/transport/ConnectionDispatcherImpl.java b/briar-core/src/org/briarproject/transport/ConnectionDispatcherImpl.java
index f37495e9f..9844ad30e 100644
--- a/briar-core/src/org/briarproject/transport/ConnectionDispatcherImpl.java
+++ b/briar-core/src/org/briarproject/transport/ConnectionDispatcherImpl.java
@@ -14,6 +14,7 @@ import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DbException;
+import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.messaging.duplex.DuplexConnectionFactory;
import org.briarproject.api.messaging.simplex.SimplexConnectionFactory;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
@@ -22,31 +23,30 @@ import org.briarproject.api.plugins.simplex.SimplexTransportWriter;
import org.briarproject.api.transport.ConnectionContext;
import org.briarproject.api.transport.ConnectionDispatcher;
import org.briarproject.api.transport.ConnectionRecogniser;
-import org.briarproject.api.transport.IncomingConnectionExecutor;
class ConnectionDispatcherImpl implements ConnectionDispatcher {
private static final Logger LOG =
Logger.getLogger(ConnectionDispatcherImpl.class.getName());
- private final Executor connExecutor;
+ private final Executor ioExecutor;
private final ConnectionRecogniser recogniser;
private final SimplexConnectionFactory simplexConnFactory;
private final DuplexConnectionFactory duplexConnFactory;
@Inject
- ConnectionDispatcherImpl(@IncomingConnectionExecutor Executor connExecutor,
+ ConnectionDispatcherImpl(@IoExecutor Executor ioExecutor,
ConnectionRecogniser recogniser,
SimplexConnectionFactory simplexConnFactory,
DuplexConnectionFactory duplexConnFactory) {
- this.connExecutor = connExecutor;
+ this.ioExecutor = ioExecutor;
this.recogniser = recogniser;
this.simplexConnFactory = simplexConnFactory;
this.duplexConnFactory = duplexConnFactory;
}
public void dispatchReader(TransportId t, SimplexTransportReader r) {
- connExecutor.execute(new DispatchSimplexConnection(t, r));
+ ioExecutor.execute(new DispatchSimplexConnection(t, r));
}
public void dispatchWriter(ContactId c, TransportId t,
@@ -56,7 +56,7 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
public void dispatchIncomingConnection(TransportId t,
DuplexTransportConnection d) {
- connExecutor.execute(new DispatchDuplexConnection(t, d));
+ ioExecutor.execute(new DispatchDuplexConnection(t, d));
}
public void dispatchOutgoingConnection(ContactId c, TransportId t,
diff --git a/briar-core/src/org/briarproject/transport/TransportModule.java b/briar-core/src/org/briarproject/transport/TransportModule.java
index b53c7f7c1..3fec3a806 100644
--- a/briar-core/src/org/briarproject/transport/TransportModule.java
+++ b/briar-core/src/org/briarproject/transport/TransportModule.java
@@ -1,14 +1,5 @@
package org.briarproject.transport;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-
import javax.inject.Singleton;
import org.briarproject.api.crypto.KeyManager;
@@ -18,26 +9,13 @@ import org.briarproject.api.transport.ConnectionReaderFactory;
import org.briarproject.api.transport.ConnectionRecogniser;
import org.briarproject.api.transport.ConnectionRegistry;
import org.briarproject.api.transport.ConnectionWriterFactory;
-import org.briarproject.api.transport.IncomingConnectionExecutor;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class TransportModule extends AbstractModule {
- private final ExecutorService incomingConnectionExecutor;
-
- public TransportModule() {
- // The thread pool is unbounded, so use direct handoff
- BlockingQueue queue = new SynchronousQueue();
- // Discard tasks that are submitted during shutdown
- RejectedExecutionHandler policy =
- new ThreadPoolExecutor.DiscardPolicy();
- // Create threads as required and keep them in the pool for 60 seconds
- incomingConnectionExecutor = new ThreadPoolExecutor(0,
- Integer.MAX_VALUE, 60, SECONDS, queue, policy);
- }
-
+ @Override
protected void configure() {
bind(ConnectionDispatcher.class).to(ConnectionDispatcherImpl.class);
bind(ConnectionReaderFactory.class).to(
@@ -55,10 +33,4 @@ public class TransportModule extends AbstractModule {
lifecycleManager.register(keyManager);
return keyManager;
}
-
- @Provides @Singleton @IncomingConnectionExecutor
- Executor getIncomingConnectionExecutor(LifecycleManager lifecycleManager) {
- lifecycleManager.registerForShutdown(incomingConnectionExecutor);
- return incomingConnectionExecutor;
- }
}
diff --git a/briar-desktop/src/org/briarproject/lifecycle/DesktopLifecycleModule.java b/briar-desktop/src/org/briarproject/lifecycle/DesktopLifecycleModule.java
index 8395bc8cf..ea12c7098 100644
--- a/briar-desktop/src/org/briarproject/lifecycle/DesktopLifecycleModule.java
+++ b/briar-desktop/src/org/briarproject/lifecycle/DesktopLifecycleModule.java
@@ -4,21 +4,20 @@ import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.lifecycle.ShutdownManager;
import org.briarproject.util.OsUtils;
-import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
-public class DesktopLifecycleModule extends AbstractModule {
+public class DesktopLifecycleModule extends LifecycleModule {
+ @Override
protected void configure() {
bind(LifecycleManager.class).to(
LifecycleManagerImpl.class).in(Singleton.class);
if(OsUtils.isWindows()) {
bind(ShutdownManager.class).to(
- WindowsShutdownManagerImpl.class).in(
- Singleton.class);
+ WindowsShutdownManagerImpl.class).in(Singleton.class);
} else {
bind(ShutdownManager.class).to(
- ShutdownManagerImpl.class).in(Singleton.class);
+ ShutdownManagerImpl.class).in(Singleton.class);
}
}
}
diff --git a/briar-desktop/src/org/briarproject/plugins/DesktopPluginsModule.java b/briar-desktop/src/org/briarproject/plugins/DesktopPluginsModule.java
index 021b80301..58be0ce51 100644
--- a/briar-desktop/src/org/briarproject/plugins/DesktopPluginsModule.java
+++ b/briar-desktop/src/org/briarproject/plugins/DesktopPluginsModule.java
@@ -5,8 +5,8 @@ import java.util.Collection;
import java.util.concurrent.Executor;
import org.briarproject.api.crypto.CryptoComponent;
+import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.lifecycle.ShutdownManager;
-import org.briarproject.api.plugins.PluginExecutor;
import org.briarproject.api.plugins.duplex.DuplexPluginConfig;
import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
import org.briarproject.api.plugins.simplex.SimplexPluginConfig;
@@ -19,18 +19,15 @@ import org.briarproject.plugins.modem.ModemPluginFactory;
import org.briarproject.plugins.tcp.LanTcpPluginFactory;
import org.briarproject.plugins.tcp.WanTcpPluginFactory;
-import com.google.inject.AbstractModule;
import com.google.inject.Provides;
-public class DesktopPluginsModule extends AbstractModule {
-
- public void configure() {}
+public class DesktopPluginsModule extends PluginsModule {
@Provides
- SimplexPluginConfig getSimplexPluginConfig(
- @PluginExecutor Executor pluginExecutor, FileUtils fileUtils) {
+ SimplexPluginConfig getSimplexPluginConfig(@IoExecutor Executor ioExecutor,
+ FileUtils fileUtils) {
SimplexPluginFactory removable =
- new RemovableDrivePluginFactory(pluginExecutor, fileUtils);
+ new RemovableDrivePluginFactory(ioExecutor, fileUtils);
final Collection factories =
Arrays.asList(removable);
return new SimplexPluginConfig() {
@@ -41,16 +38,15 @@ public class DesktopPluginsModule extends AbstractModule {
}
@Provides
- DuplexPluginConfig getDuplexPluginConfig(
- @PluginExecutor Executor pluginExecutor,
+ DuplexPluginConfig getDuplexPluginConfig(@IoExecutor Executor ioExecutor,
CryptoComponent crypto, ReliabilityLayerFactory reliabilityFactory,
ShutdownManager shutdownManager) {
DuplexPluginFactory bluetooth = new BluetoothPluginFactory(
- pluginExecutor, crypto.getSecureRandom());
- DuplexPluginFactory modem = new ModemPluginFactory(pluginExecutor,
+ ioExecutor, crypto.getSecureRandom());
+ DuplexPluginFactory modem = new ModemPluginFactory(ioExecutor,
reliabilityFactory);
- DuplexPluginFactory lan = new LanTcpPluginFactory(pluginExecutor);
- DuplexPluginFactory wan = new WanTcpPluginFactory(pluginExecutor,
+ DuplexPluginFactory lan = new LanTcpPluginFactory(ioExecutor);
+ DuplexPluginFactory wan = new WanTcpPluginFactory(ioExecutor,
shutdownManager);
final Collection