Fixed inconsistent locking in database.

Previously, when table A had a foreign key pointing to table B, we got
read locks on A and B to read A, a write lock on A and a read lock on
B to update A, and a write lock on B to update B (but this wasn't
applied consistently). Now we get a read lock on A to read A, a write
lock on A to update A, and write locks on A and B to update B. The
difference is small in practice, but clarifying the rules has helped to
catch some bugs.
This commit is contained in:
akwizgran
2013-02-28 17:00:55 +00:00
parent 43c8cfa248
commit 7ed85c62c3
4 changed files with 247 additions and 250 deletions

View File

@@ -90,7 +90,7 @@ interface Database<T> {
/**
* Adds an endpoint to the database.
* <p>
* Locking: contact read, transport read, window write.
* Locking: window write.
*/
void addEndpoint(T txn, Endpoint ep) throws DbException;
@@ -105,7 +105,7 @@ interface Database<T> {
/**
* Records a received message as needing to be acknowledged.
* <p>
* Locking: contact read, message write.
* Locking: message write.
*/
void addMessageToAck(T txn, ContactId c, MessageId m) throws DbException;
@@ -113,7 +113,7 @@ interface Database<T> {
* Stores the given message, or returns false if the message is already in
* the database.
* <p>
* Locking: contact read, message write.
* Locking: message write.
*/
boolean addPrivateMessage(T txn, Message m, ContactId c) throws DbException;
@@ -121,7 +121,7 @@ interface Database<T> {
* Stores the given temporary secrets and deletes any secrets that have
* been made obsolete.
* <p>
* Locking: contact read, transport read, window write.
* Locking: window write.
*/
void addSecrets(T txn, Collection<TemporarySecret> secrets)
throws DbException;
@@ -130,7 +130,7 @@ interface Database<T> {
* Initialises the status (seen or unseen) of the given message with
* respect to the given contact.
* <p>
* Locking: contact read, message write.
* Locking: message write.
*/
void addStatus(T txn, ContactId c, MessageId m, boolean seen)
throws DbException;
@@ -147,14 +147,14 @@ interface Database<T> {
* Adds a new transport to the database and returns true if the transport
* was not previously in the database.
* <p>
* Locking: transport write.
* Locking: transport write, window write.
*/
boolean addTransport(T txn, TransportId t) throws DbException;
/**
* Makes the given group visible to the given contact.
* <p>
* Locking: contact write, subscription write.
* Locking: subscription write.
*/
void addVisibility(T txn, ContactId c, GroupId g) throws DbException;
@@ -182,7 +182,7 @@ interface Database<T> {
/**
* Returns true if the database contains the given transport.
* <p>
* Locking: contact read, transport read.
* Locking: transport read.
*/
boolean containsTransport(T txn, TransportId t) throws DbException;
@@ -190,7 +190,7 @@ interface Database<T> {
* Returns true if the user subscribes to the given group and the
* subscription is visible to the given contact.
* <p>
* Locking: contact read, subscription read.
* Locking: subscription read.
*/
boolean containsVisibleSubscription(T txn, ContactId c, GroupId g)
throws DbException;
@@ -219,7 +219,7 @@ interface Database<T> {
/**
* Returns all endpoints.
* <p>
* Locking: contact read, transport read, window read.
* Locking: window read.
*/
Collection<Endpoint> getEndpoints(T txn) throws DbException;
@@ -227,8 +227,6 @@ interface Database<T> {
* Returns the amount of free storage space available to the database, in
* bytes. This is based on the minimum of the space available on the device
* where the database is stored and the database's configured size.
* <p>
* Locking: message read.
*/
long getFreeSpace() throws DbException;
@@ -245,7 +243,7 @@ interface Database<T> {
* Returns the time at which a connection to the given contact was last
* made.
* <p>
* Locking: contact read, window read.
* Locking: window read.
*/
long getLastConnected(T txn, ContactId c) throws DbException;
@@ -292,7 +290,7 @@ interface Database<T> {
* Returns the IDs of some messages received from the given contact that
* need to be acknowledged, up to the given number of messages.
* <p>
* Locking: contact read, message read.
* Locking: message read.
*/
Collection<MessageId> getMessagesToAck(T txn, ContactId c, int maxMessages)
throws DbException;
@@ -301,7 +299,7 @@ interface Database<T> {
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given number of messages.
* <p>
* Locking: contact read, message read, subscription read.
* Locking: message read, subscription read.
*/
Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
int maxMessages) throws DbException;
@@ -327,7 +325,7 @@ interface Database<T> {
* Returns null if the message is not present in the database or is not
* sendable to the given contact.
* <p>
* Locking: contact read, message read, subscription read.
* Locking: message read, subscription read.
*/
byte[] getRawMessageIfSendable(T txn, ContactId c, MessageId m)
throws DbException;
@@ -357,7 +355,7 @@ interface Database<T> {
/**
* Returns all remote properties for the given transport.
* <p>
* Locking: contact read, transport read.
* Locking: transport read.
*/
Map<ContactId, TransportProperties> getRemoteProperties(T txn,
TransportId t) throws DbException;
@@ -365,7 +363,7 @@ interface Database<T> {
/**
* Returns a retention ack for the given contact, or null if no ack is due.
* <p>
* Locking: contact read, retention write.
* Locking: retention write.
*/
RetentionAck getRetentionAck(T txn, ContactId c) throws DbException;
@@ -373,7 +371,7 @@ interface Database<T> {
* Returns a retention update for the given contact and updates its expiry
* time using the given latency. Returns null if no update is due.
* <p>
* Locking: contact read, retention write.
* Locking: message read, retention write.
*/
RetentionUpdate getRetentionUpdate(T txn, ContactId c, long maxLatency)
throws DbException;
@@ -381,7 +379,7 @@ interface Database<T> {
/**
* Returns all temporary secrets.
* <p>
* Locking: contact read, transport read, window read.
* Locking: window read.
*/
Collection<TemporarySecret> getSecrets(T txn) throws DbException;
@@ -397,7 +395,7 @@ interface Database<T> {
* given contact, with a total length less than or equal to the given
* length.
* <p>
* Locking: contact read, message read, subscription read.
* Locking: message read, subscription read.
*/
Collection<MessageId> getSendableMessages(T txn, ContactId c, int maxLength)
throws DbException;
@@ -419,7 +417,7 @@ interface Database<T> {
/**
* Returns the groups to which the given contact subscribes.
* <p>
* Locking: contact read, subscription read.
* Locking: subscription read.
*/
Collection<Group> getSubscriptions(T txn, ContactId c) throws DbException;
@@ -427,7 +425,7 @@ interface Database<T> {
* Returns a subscription ack for the given contact, or null if no ack is
* due.
* <p>
* Locking: contact read, subscription write.
* Locking: subscription write.
*/
SubscriptionAck getSubscriptionAck(T txn, ContactId c) throws DbException;
@@ -435,7 +433,7 @@ interface Database<T> {
* Returns a subscription update for the given contact and updates its
* expiry time using the given latency. Returns null if no update is due.
* <p>
* Locking: contact read, subscription write.
* Locking: subscription write.
*/
SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c,
long maxLatency) throws DbException;
@@ -444,7 +442,7 @@ interface Database<T> {
* Returns the transmission count of the given message with respect to the
* given contact.
* <p>
* Locking: contact read, message read.
* Locking: message read.
*/
int getTransmissionCount(T txn, ContactId c, MessageId m)
throws DbException;
@@ -453,7 +451,7 @@ interface Database<T> {
* Returns a collection of transport acks for the given contact, or null if
* no acks are due.
* <p>
* Locking: contact read, transport write.
* Locking: transport write.
*/
Collection<TransportAck> getTransportAcks(T txn, ContactId c)
throws DbException;
@@ -463,31 +461,29 @@ interface Database<T> {
* updates their expiry times using the given latency. Returns null if no
* updates are due.
* <p>
* Locking: contact read, transport write.
* Locking: transport write.
*/
Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c,
long maxLatency) throws DbException;
/**
* Returns the version number of the
/**
* Returns the number of unread messages in each subscribed group.
* <p>
* Locking: message read, subscription read.
* Locking: message read.
*/
Map<GroupId, Integer> getUnreadMessageCounts(T txn) throws DbException;
/**
* Returns the contacts to which the given group is visible.
* <p>
* Locking: contact read, subscription read.
* Locking: subscription read.
*/
Collection<ContactId> getVisibility(T txn, GroupId g) throws DbException;
/**
* Returns the subscriptions that are visible to the given contact.
* <p>
* Locking: contact read, subscription read.
* Locking: subscription read.
*/
Collection<GroupId> getVisibleSubscriptions(T txn, ContactId c)
throws DbException;
@@ -495,7 +491,7 @@ interface Database<T> {
/**
* Returns true if any messages are sendable to the given contact.
* <p>
* Locking: contact read, message read.
* Locking: message read, subscription read.
*/
boolean hasSendableMessages(T txn, ContactId c) throws DbException;
@@ -503,7 +499,7 @@ interface Database<T> {
* Increments the outgoing connection counter for the given contact
* transport in the given rotation period and returns the old value;
* <p>
* Locking: contact read, transport read, window write.
* Locking: window write.
*/
long incrementConnectionCounter(T txn, ContactId c, TransportId t,
long period) throws DbException;
@@ -512,7 +508,7 @@ interface Database<T> {
* Increments the retention time versions for all contacts to indicate that
* the database's retention time has changed and updates should be sent.
* <p>
* Locking: contact read, retention write.
* Locking: retention write.
*/
void incrementRetentionVersions(T txn) throws DbException;
@@ -545,7 +541,7 @@ interface Database<T> {
/**
* Removes a message (and all associated state) from the database.
* <p>
* Locking: contact read, message write.
* Locking: message write.
*/
void removeMessage(T txn, MessageId m) throws DbException;
@@ -553,7 +549,7 @@ interface Database<T> {
* Marks the given messages received from the given contact as having been
* acknowledged.
* <p>
* Locking: contact read, message write.
* Locking: message write.
*/
void removeMessagesToAck(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
@@ -562,7 +558,7 @@ interface Database<T> {
* Marks any of the given messages that are considered outstanding with
* respect to the given contact as seen by the contact.
* <p>
* Locking: contact read, message write.
* Locking: message write.
*/
void removeOutstandingMessages(T txn, ContactId c,
Collection<MessageId> acked) throws DbException;
@@ -571,21 +567,21 @@ interface Database<T> {
* Unsubscribes from the given group. Any messages belonging to the group
* are deleted from the database.
* <p>
* Locking: contact write, message write, subscription write.
* Locking: message write, subscription write.
*/
void removeSubscription(T txn, GroupId g) throws DbException;
/**
* Removes a transport (and all associated state) from the database.
* <p>
* Locking: transport write.
* Locking: transport write, window write.
*/
void removeTransport(T txn, TransportId t) throws DbException;
/**
* Makes the given group invisible to the given contact.
* <p>
* Locking: contact write, subscription write.
* Locking: subscription write.
*/
void removeVisibility(T txn, ContactId c, GroupId g) throws DbException;
@@ -593,7 +589,7 @@ interface Database<T> {
* Sets the connection reordering window for the given endpoint in the
* given rotation period.
* <p>
* Locking: contact read, transport read, window write.
* Locking: window write.
*/
void setConnectionWindow(T txn, ContactId c, TransportId t, long period,
long centre, byte[] bitmap) throws DbException;
@@ -601,7 +597,7 @@ interface Database<T> {
/**
* Sets the time at which a connection to the given contact was last made.
* <p>
* Locking: contact read, window write.
* Locking: window write.
*/
void setLastConnected(T txn, ContactId c, long now) throws DbException;
@@ -626,7 +622,7 @@ interface Database<T> {
* with an equal or higher version number has already been received from
* the contact.
* <p>
* Locking: contact read, transport write.
* Locking: transport write.
*/
void setRemoteProperties(T txn, ContactId c, TransportId t,
TransportProperties p, long version) throws DbException;
@@ -636,7 +632,7 @@ interface Database<T> {
* update with an equal or higher version number has already been received
* from the contact.
* <p>
* Locking: contact read, retention write.
* Locking: retention write.
*/
void setRetentionTime(T txn, ContactId c, long retention, long version)
throws DbException;
@@ -662,7 +658,7 @@ interface Database<T> {
* that is visible to the given contact, marks the message as seen by the
* contact and returns true; otherwise returns false.
* <p>
* Locking: contact read, message write, subscription read.
* Locking: message write, subscription read.
*/
boolean setStatusSeenIfVisible(T txn, ContactId c, MessageId m)
throws DbException;
@@ -672,7 +668,7 @@ interface Database<T> {
* update with an equal or higher version number has already been received
* from the contact.
* <p>
* Locking: contact read, subscription write.
* Locking: subscription write.
*/
void setSubscriptions(T txn, ContactId c, Collection<Group> subs,
long version) throws DbException;
@@ -681,7 +677,7 @@ interface Database<T> {
* Records a retention ack from the given contact for the given version
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: contact read, retention write.
* Locking: retention write.
*/
void setRetentionUpdateAcked(T txn, ContactId c, long version)
throws DbException;
@@ -690,7 +686,7 @@ interface Database<T> {
* Records a subscription ack from the given contact for the given version
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: contact read, subscription write.
* Locking: subscription write.
*/
void setSubscriptionUpdateAcked(T txn, ContactId c, long version)
throws DbException;
@@ -699,7 +695,7 @@ interface Database<T> {
* Records a transport ack from the give contact for the given version
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: contact read, transport write.
* Locking: transport write.
*/
void setTransportUpdateAcked(T txn, ContactId c, TransportId t,
long version) throws DbException;
@@ -709,7 +705,7 @@ interface Database<T> {
* contact, using the given transmission counts and the latency of the
* transport over which they were sent.
* <p>
* Locking: contact read, message write.
* Locking: message write.
*/
void updateExpiryTimes(T txn, ContactId c, Map<MessageId, Integer> sent,
long maxLatency) throws DbException;

View File

@@ -254,21 +254,27 @@ DatabaseCleaner.Callback {
try {
messageLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
ratingLock.readLock().lock();
try {
T txn = db.startTransaction();
subscriptionLock.readLock().lock();
try {
// Don't store the message if the user has
// unsubscribed from the group
if(db.containsSubscription(txn, m.getGroup().getId()))
added = storeGroupMessage(txn, m, null);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
T txn = db.startTransaction();
try {
// Don't store the message if the user has
// unsubscribed from the group
GroupId g = m.getGroup().getId();
if(db.containsSubscription(txn, g))
added = storeGroupMessage(txn, m, null);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
subscriptionLock.readLock().unlock();
ratingLock.readLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
@@ -285,7 +291,7 @@ DatabaseCleaner.Callback {
* sendability of its ancestors if necessary, marks the message as seen by
* the sender and unseen by all other contacts, and returns true.
* <p>
* Locking: contact read, message write.
* Locking: contact read, message write, rating read.
* @param sender may be null for a locally generated message.
*/
private boolean storeGroupMessage(T txn, Message m, ContactId sender)
@@ -315,7 +321,7 @@ DatabaseCleaner.Callback {
/**
* Calculates and returns the sendability score of a message.
* <p>
* Locking: message write.
* Locking: message read, rating read.
*/
private int calculateSendability(T txn, Message m) throws DbException {
int sendability = 0;
@@ -431,13 +437,18 @@ DatabaseCleaner.Callback {
boolean added;
transportLock.writeLock().lock();
try {
T txn = db.startTransaction();
windowLock.writeLock().lock();
try {
added = db.addTransport(txn, t);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
T txn = db.startTransaction();
try {
added = db.addTransport(txn, t);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
windowLock.writeLock().unlock();
}
} finally {
transportLock.writeLock().unlock();
@@ -452,7 +463,7 @@ DatabaseCleaner.Callback {
* the given contact, depending on whether the message is outgoing or
* incoming, respectively.
* <p>
* Locking: contact read, message write.
* Locking: message write.
*/
private boolean storePrivateMessage(T txn, Message m, ContactId c,
boolean incoming) throws DbException {
@@ -515,7 +526,7 @@ DatabaseCleaner.Callback {
// Get some sendable messages from the database
contactLock.readLock().lock();
try {
messageLock.writeLock().lock();
messageLock.readLock().lock();
try {
subscriptionLock.readLock().lock();
try {
@@ -537,7 +548,7 @@ DatabaseCleaner.Callback {
subscriptionLock.readLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
messageLock.readLock().unlock();
}
if(messages.isEmpty()) return null;
// Record the messages as sent
@@ -627,15 +638,20 @@ DatabaseCleaner.Callback {
try {
messageLock.readLock().lock();
try {
T txn = db.startTransaction();
subscriptionLock.readLock().lock();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
offered = db.getMessagesToOffer(txn, c, maxMessages);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
offered = db.getMessagesToOffer(txn, c, maxMessages);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageLock.readLock().unlock();
@@ -674,22 +690,27 @@ DatabaseCleaner.Callback {
throws DbException {
contactLock.readLock().lock();
try {
retentionLock.writeLock().lock();
messageLock.readLock().lock();
try {
T txn = db.startTransaction();
retentionLock.writeLock().lock();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
RetentionUpdate u =
db.getRetentionUpdate(txn, c, maxLatency);
db.commitTransaction(txn);
return u;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
RetentionUpdate u =
db.getRetentionUpdate(txn, c, maxLatency);
db.commitTransaction(txn);
return u;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
retentionLock.writeLock().unlock();
}
} finally {
retentionLock.writeLock().unlock();
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
@@ -882,17 +903,22 @@ DatabaseCleaner.Callback {
throws DbException {
messageLock.readLock().lock();
try {
T txn = db.startTransaction();
subscriptionLock.readLock().lock();
try {
if(!db.containsSubscription(txn, g))
throw new NoSuchSubscriptionException();
Collection<GroupMessageHeader> headers =
db.getMessageHeaders(txn, g);
db.commitTransaction(txn);
return headers;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
T txn = db.startTransaction();
try {
if(!db.containsSubscription(txn, g))
throw new NoSuchSubscriptionException();
Collection<GroupMessageHeader> headers =
db.getMessageHeaders(txn, g);
db.commitTransaction(txn);
return headers;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageLock.readLock().unlock();
@@ -956,53 +982,37 @@ DatabaseCleaner.Callback {
public Map<ContactId, TransportProperties> getRemoteProperties(
TransportId t) throws DbException {
contactLock.readLock().lock();
transportLock.readLock().lock();
try {
transportLock.readLock().lock();
T txn = db.startTransaction();
try {
T txn = db.startTransaction();
try {
Map<ContactId, TransportProperties> properties =
db.getRemoteProperties(txn, t);
db.commitTransaction(txn);
return properties;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
Map<ContactId, TransportProperties> properties =
db.getRemoteProperties(txn, t);
db.commitTransaction(txn);
return properties;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
contactLock.readLock().unlock();
transportLock.readLock().unlock();
}
}
public Collection<TemporarySecret> getSecrets() throws DbException {
contactLock.readLock().lock();
windowLock.readLock().lock();
try {
transportLock.readLock().lock();
T txn = db.startTransaction();
try {
windowLock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Collection<TemporarySecret> secrets =
db.getSecrets(txn);
db.commitTransaction(txn);
return secrets;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
windowLock.readLock().unlock();
}
} finally {
transportLock.readLock().unlock();
Collection<TemporarySecret> secrets = db.getSecrets(txn);
db.commitTransaction(txn);
return secrets;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
contactLock.readLock().unlock();
windowLock.readLock().unlock();
}
}
@@ -1045,20 +1055,14 @@ DatabaseCleaner.Callback {
public Map<GroupId, Integer> getUnreadMessageCounts() throws DbException {
messageLock.readLock().lock();
try {
subscriptionLock.readLock().lock();
T txn = db.startTransaction();
try {
T txn = db.startTransaction();
try {
Map<GroupId, Integer> counts =
db.getUnreadMessageCounts(txn);
db.commitTransaction(txn);
return counts;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
Map<GroupId, Integer> counts = db.getUnreadMessageCounts(txn);
db.commitTransaction(txn);
return counts;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageLock.readLock().unlock();
@@ -1066,26 +1070,21 @@ DatabaseCleaner.Callback {
}
public Collection<ContactId> getVisibility(GroupId g) throws DbException {
contactLock.readLock().lock();
subscriptionLock.readLock().lock();
try {
subscriptionLock.readLock().lock();
T txn = db.startTransaction();
try {
T txn = db.startTransaction();
try {
if(!db.containsSubscription(txn, g))
throw new NoSuchSubscriptionException();
Collection<ContactId> visible = db.getVisibility(txn, g);
db.commitTransaction(txn);
return visible;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
if(!db.containsSubscription(txn, g))
throw new NoSuchSubscriptionException();
Collection<ContactId> visible = db.getVisibility(txn, g);
db.commitTransaction(txn);
return visible;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
contactLock.readLock().unlock();
subscriptionLock.readLock().unlock();
}
}
@@ -1250,21 +1249,26 @@ DatabaseCleaner.Callback {
try {
messageLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
ratingLock.readLock().lock();
try {
T txn = db.startTransaction();
subscriptionLock.readLock().lock();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
added = storeMessage(txn, c, m);
db.addMessageToAck(txn, c, m.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
added = storeMessage(txn, c, m);
db.addMessageToAck(txn, c, m.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
subscriptionLock.readLock().unlock();
ratingLock.readLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
@@ -1280,7 +1284,7 @@ DatabaseCleaner.Callback {
* Attempts to store a message received from the given contact, and returns
* true if it was stored.
* <p>
* Locking: contact read, message write, subscription read.
* Locking: contact read, message write, rating read, subscription read.
*/
private boolean storeMessage(T txn, ContactId c, Message m)
throws DbException {
@@ -1530,15 +1534,20 @@ DatabaseCleaner.Callback {
public void removeTransport(TransportId t) throws DbException {
transportLock.writeLock().lock();
try {
T txn = db.startTransaction();
windowLock.writeLock().lock();
try {
if(!db.containsTransport(txn, t))
throw new NoSuchTransportException();
db.removeTransport(txn, t);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
T txn = db.startTransaction();
try {
if(!db.containsTransport(txn, t))
throw new NoSuchTransportException();
db.removeTransport(txn, t);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
windowLock.writeLock().unlock();
}
} finally {
transportLock.writeLock().unlock();
@@ -1705,7 +1714,7 @@ DatabaseCleaner.Callback {
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
Collection<ContactId> affected = new ArrayList<ContactId>();
contactLock.writeLock().lock();
contactLock.readLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
@@ -1739,7 +1748,7 @@ DatabaseCleaner.Callback {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
contactLock.readLock().unlock();
}
if(!affected.isEmpty())
callListeners(new LocalSubscriptionsUpdatedEvent(affected));
@@ -1767,31 +1776,26 @@ DatabaseCleaner.Callback {
public void unsubscribe(GroupId g) throws DbException {
Collection<ContactId> affected;
contactLock.writeLock().lock();
messageLock.writeLock().lock();
try {
messageLock.writeLock().lock();
subscriptionLock.writeLock().lock();
try {
subscriptionLock.writeLock().lock();
T txn = db.startTransaction();
try {
T txn = db.startTransaction();
try {
if(!db.containsSubscription(txn, g))
throw new NoSuchSubscriptionException();
affected = db.getVisibility(txn, g);
db.removeSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
if(!db.containsSubscription(txn, g))
throw new NoSuchSubscriptionException();
affected = db.getVisibility(txn, g);
db.removeSubscription(txn, g);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageLock.writeLock().unlock();
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
messageLock.writeLock().unlock();
}
callListeners(new LocalSubscriptionsUpdatedEvent(affected));
}
@@ -1817,34 +1821,28 @@ DatabaseCleaner.Callback {
*/
private boolean expireMessages(int size) throws DbException {
boolean removed = false;
contactLock.readLock().lock();
messageLock.writeLock().lock();
try {
messageLock.writeLock().lock();
retentionLock.writeLock().lock();
try {
retentionLock.writeLock().lock();
T txn = db.startTransaction();
try {
T txn = db.startTransaction();
try {
Collection<MessageId> old =
db.getOldMessages(txn, size);
if(!old.isEmpty()) {
for(MessageId m : old) removeMessage(txn, m);
db.incrementRetentionVersions(txn);
removed = true;
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
Collection<MessageId> old = db.getOldMessages(txn, size);
if(!old.isEmpty()) {
for(MessageId m : old) removeMessage(txn, m);
db.incrementRetentionVersions(txn);
removed = true;
}
} finally {
retentionLock.writeLock().unlock();
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageLock.writeLock().unlock();
retentionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
messageLock.writeLock().unlock();
}
if(removed) callListeners(new MessageExpiredEvent());
return removed;
@@ -1853,7 +1851,7 @@ DatabaseCleaner.Callback {
/**
* Removes the given message (and all associated state) from the database.
* <p>
* Locking: contact read, message write.
* Locking: message write.
*/
private void removeMessage(T txn, MessageId m) throws DbException {
int sendability = db.getSendability(txn, m);

View File

@@ -65,6 +65,14 @@ class H2Database extends JdbcDatabase {
}
}
private long getDiskSpace(File f) {
long total = 0;
if(f.isDirectory()) {
for(File child : f.listFiles()) total += getDiskSpace(child);
return total;
} else return f.length();
}
@Override
protected Connection createConnection() throws SQLException {
Properties props = new Properties();

View File

@@ -61,6 +61,7 @@ import net.sf.briar.util.FileUtils;
abstract class JdbcDatabase implements Database<Connection> {
// Locking: contact
// Dependents: message, retention, subscription, transport, window
private static final String CREATE_CONTACTS =
"CREATE TABLE contacts "
+ " (contactId COUNTER,"
@@ -68,6 +69,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " PRIMARY KEY (contactId))";
// Locking: subscription
// Dependents: message
private static final String CREATE_GROUPS =
"CREATE TABLE groups"
+ " (groupId HASH NOT NULL,"
@@ -75,7 +77,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " key BINARY," // Null for unrestricted groups
+ " PRIMARY KEY (groupId))";
// Locking: contact read, subscription
// Locking: subscription
private static final String CREATE_GROUP_VISIBILITIES =
"CREATE TABLE groupVisibilities"
+ " (contactId INT NOT NULL,"
@@ -87,7 +89,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES groups (groupId)"
+ " ON DELETE CASCADE)";
// Locking: contact read, subscription
// Locking: subscription
private static final String CREATE_CONTACT_GROUPS =
"CREATE TABLE contactGroups"
+ " (contactId INT NOT NULL,"
@@ -99,7 +101,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: contact read, subscription
// Locking: subscription
private static final String CREATE_GROUP_VERSIONS =
"CREATE TABLE groupVersions"
+ " (contactId INT NOT NULL,"
@@ -153,17 +155,17 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String INDEX_MESSAGES_BY_SENDABILITY =
"CREATE INDEX messagesBySendability ON messages (sendability)";
// Locking: contact read, message
// Locking: message
private static final String CREATE_MESSAGES_TO_ACK =
"CREATE TABLE messagesToAck"
+ " (messageId HASH NOT NULL,"
+ " (messageId HASH NOT NULL," // Not a foreign key
+ " contactId INT NOT NULL,"
+ " PRIMARY KEY (messageId, contactId),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: contact read, message
// Locking: message
private static final String CREATE_STATUSES =
"CREATE TABLE statuses"
+ " (messageId HASH NOT NULL,"
@@ -192,7 +194,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " rating SMALLINT NOT NULL,"
+ " PRIMARY KEY (authorId))";
// Locking: contact read, retention
// Locking: retention
private static final String CREATE_RETENTION_VERSIONS =
"CREATE TABLE retentionVersions"
+ " (contactId INT NOT NULL,"
@@ -209,6 +211,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " ON DELETE CASCADE)";
// Locking: transport
// Dependents: window
private static final String CREATE_TRANSPORTS =
"CREATE TABLE transports (transportId HASH NOT NULL)";
@@ -234,7 +237,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
// Locking: contact read, transport
// Locking: transport
private static final String CREATE_TRANSPORT_VERSIONS =
"CREATE TABLE transportVersions"
+ " (contactId INT NOT NULL,"
@@ -251,7 +254,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
// Locking: contact read, transport
// Locking: transport
private static final String CREATE_CONTACT_TRANSPORT_PROPS =
"CREATE TABLE contactTransportProperties"
+ " (contactId INT NOT NULL,"
@@ -263,7 +266,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: contact read, transport
// Locking: transport
private static final String CREATE_CONTACT_TRANSPORT_VERSIONS =
"CREATE TABLE contactTransportVersions"
+ " (contactId INT NOT NULL,"
@@ -275,7 +278,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
// Locking: contact read, transport read, window
// Locking: window
private static final String CREATE_ENDPOINTS =
"CREATE TABLE endpoints"
+ " (contactId INT NOT NULL,"
@@ -292,7 +295,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
// Locking: contact read, transport read, window
// Locking: window
private static final String CREATE_SECRETS =
"CREATE TABLE secrets"
+ " (contactId INT NOT NULL,"
@@ -310,7 +313,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
// Locking: contact read, window
// Locking: window
private static final String CREATE_CONNECTION_TIMES =
"CREATE TABLE connectionTimes"
+ " (contactId INT NOT NULL,"
@@ -754,7 +757,7 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT COUNT (groupId) from GROUPS";
String sql = "SELECT COUNT (groupId) FROM groups";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
if(!rs.next()) throw new DbStateException();
@@ -1118,14 +1121,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
protected long getDiskSpace(File f) {
long total = 0;
if(f.isDirectory()) {
for(File child : f.listFiles()) total += getDiskSpace(child);
return total;
} else return f.length();
}
public MessageId getGroupMessageParent(Connection txn, MessageId m)
throws DbException {
PreparedStatement ps = null;