Removed database locking.

This commit is contained in:
akwizgran
2016-02-05 17:49:49 +00:00
parent 0c392e8b78
commit 623707af0f
3 changed files with 506 additions and 949 deletions

View File

@@ -30,27 +30,23 @@ import java.util.Map;
* obtained by calling {@link #startTransaction()}. Every transaction must be
* terminated by calling either {@link #abortTransaction(T)} or
* {@link #commitTransaction(T)}, even if an exception is thrown.
* <p>
* Read-write locking is provided by the DatabaseComponent implementation.
*/
interface Database<T> {
/**
* Opens the database and returns true if the database already existed.
* <p>
* Locking: write.
*/
boolean open() throws DbException;
/**
* Prevents new transactions from starting, waits for all current
* transactions to finish, and closes the database.
* <p>
* Locking: write.
*/
void close() throws DbException, IOException;
/** Starts a new transaction and returns an object representing it. */
/**
* Starts a new transaction and returns an object representing it.
*/
T startTransaction() throws DbException;
/**
@@ -65,58 +61,38 @@ interface Database<T> {
*/
void commitTransaction(T txn) throws DbException;
/**
* Returns the number of transactions started since the transaction count
* was last reset.
*/
int getTransactionCount();
/** Resets the transaction count. */
void resetTransactionCount();
/**
* Stores a contact associated with the given local and remote pseudonyms,
* and returns an ID for the contact.
* <p>
* Locking: write.
*/
ContactId addContact(T txn, Author remote, AuthorId local)
throws DbException;
/**
* Stores a group.
* <p>
* Locking: write.
*/
void addGroup(T txn, Group g) throws DbException;
/**
* Stores a local pseudonym.
* <p>
* Locking: write.
*/
void addLocalAuthor(T txn, LocalAuthor a) throws DbException;
/**
* Stores a message.
* <p>
* Locking: write.
*/
void addMessage(T txn, Message m, Validity validity, boolean shared)
throws DbException;
/**
* Records that a message has been offered by the given contact.
* <p>
* Locking: write.
*/
void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException;
/**
* Initialises the status of the given message with respect to the given
* contact.
* <p>
* Locking: write.
*
* @param ack whether the message needs to be acknowledged.
* @param seen whether the contact has seen the message.
*/
@@ -125,76 +101,56 @@ interface Database<T> {
/**
* Stores a transport.
* <p>
* Locking: write.
*/
void addTransport(T txn, TransportId t, int maxLatency)
throws DbException;
/**
* Stores transport keys for a newly added contact.
* <p>
* Locking: write.
*/
void addTransportKeys(T txn, ContactId c, TransportKeys k)
throws DbException;
/**
* Makes a group visible to the given contact.
* <p>
* Locking: write.
*/
void addVisibility(T txn, ContactId c, GroupId g) throws DbException;
/**
* Returns true if the database contains the given contact for the given
* local pseudonym.
* <p>
* Locking: read.
*/
boolean containsContact(T txn, AuthorId remote, AuthorId local)
throws DbException;
/**
* Returns true if the database contains the given contact.
* <p>
* Locking: read.
*/
boolean containsContact(T txn, ContactId c) throws DbException;
/**
* Returns true if the database contains the given group.
* <p>
* Locking: read.
*/
boolean containsGroup(T txn, GroupId g) throws DbException;
/**
* Returns true if the database contains the given local pseudonym.
* <p>
* Locking: read.
*/
boolean containsLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Returns true if the database contains the given message.
* <p>
* Locking: read.
*/
boolean containsMessage(T txn, MessageId m) throws DbException;
/**
* Returns true if the database contains the given transport.
* <p>
* Locking: read.
*/
boolean containsTransport(T txn, TransportId t) throws DbException;
/**
* Returns true if the database contains the given group and the group is
* visible to the given contact.
* <p>
* Locking: read.
*/
boolean containsVisibleGroup(T txn, ContactId c, GroupId g)
throws DbException;
@@ -202,16 +158,12 @@ interface Database<T> {
/**
* Returns true if the database contains the given message and the message
* is visible to the given contact.
* <p>
* Locking: read.
*/
boolean containsVisibleMessage(T txn, ContactId c, MessageId m)
throws DbException;
/**
* Returns the number of messages offered by the given contact.
* <p>
* Locking: read.
*/
int countOfferedMessages(T txn, ContactId c) throws DbException;
@@ -234,36 +186,26 @@ interface Database<T> {
/**
* Returns the contact with the given ID.
* <p>
* Locking: read.
*/
Contact getContact(T txn, ContactId c) throws DbException;
/**
* Returns the IDs of all contacts.
* <p>
* Locking: read.
*/
Collection<ContactId> getContactIds(T txn) throws DbException;
/**
* Returns all contacts.
* <p>
* Locking: read.
*/
Collection<Contact> getContacts(T txn) throws DbException;
/**
* Returns all contacts associated with the given local pseudonym.
* <p>
* Locking: read.
*/
Collection<ContactId> getContacts(T txn, AuthorId a) throws DbException;
/**
* Returns the unique ID for this device.
* <p>
* Locking: read.
*/
DeviceId getDeviceId(T txn) throws DbException;
@@ -276,59 +218,43 @@ interface Database<T> {
/**
* Returns the group with the given ID.
* <p>
* Locking: read.
*/
Group getGroup(T txn, GroupId g) throws DbException;
/**
* Returns the metadata for the given group.
* <p>
* Locking: read.
*/
Metadata getGroupMetadata(T txn, GroupId g) throws DbException;
/**
* Returns all groups belonging to the given client.
* <p>
* Locking: read.
*/
Collection<Group> getGroups(T txn, ClientId c) throws DbException;
/**
* Returns the local pseudonym with the given ID.
* <p>
* Locking: read.
*/
LocalAuthor getLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Returns all local pseudonyms.
* <p>
* Locking: read.
*/
Collection<LocalAuthor> getLocalAuthors(T txn) throws DbException;
/**
* Returns the metadata for all messages in the given group.
* <p>
* Locking: read.
*/
Map<MessageId, Metadata> getMessageMetadata(T txn, GroupId g)
throws DbException;
/**
* Returns the metadata for the given message.
* <p>
* Locking: read.
*/
Metadata getMessageMetadata(T txn, MessageId m) throws DbException;
/**
* Returns the status of all messages in the given group with respect
* to the given contact.
* <p>
* Locking: read
*/
Collection<MessageStatus> getMessageStatus(T txn, ContactId c, GroupId g)
throws DbException;
@@ -336,8 +262,6 @@ interface Database<T> {
/**
* Returns the status of the given message with respect to the given
* contact.
* <p>
* Locking: read
*/
MessageStatus getMessageStatus(T txn, ContactId c, MessageId m)
throws DbException;
@@ -345,8 +269,6 @@ interface Database<T> {
/**
* Returns the IDs of some messages received from the given contact that
* need to be acknowledged, up to the given number of messages.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToAck(T txn, ContactId c, int maxMessages)
throws DbException;
@@ -354,8 +276,6 @@ interface Database<T> {
/**
* Returns the IDs of some messages that are eligible to be offered to the
* given contact, up to the given number of messages.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
int maxMessages) throws DbException;
@@ -363,8 +283,6 @@ interface Database<T> {
/**
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given total length.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength)
throws DbException;
@@ -372,8 +290,6 @@ interface Database<T> {
/**
* Returns the IDs of some messages that are eligible to be requested from
* the given contact, up to the given number of messages.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToRequest(T txn, ContactId c,
int maxMessages) throws DbException;
@@ -381,16 +297,12 @@ interface Database<T> {
/**
* Returns the IDs of any messages that need to be validated by the given
* client.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToValidate(T txn, ClientId c)
throws DbException;
/**
* Returns the message with the given ID, in serialised form.
* <p>
* Locking: read.
*/
byte[] getRawMessage(T txn, MessageId m) throws DbException;
@@ -398,46 +310,34 @@ interface Database<T> {
* Returns the IDs of some messages that are eligible to be sent to the
* given contact and have been requested by the contact, up to the given
* total length.
* <p>
* Locking: read.
*/
Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c,
int maxLength) throws DbException;
/**
* Returns all settings in the given namespace.
* <p>
* Locking: read.
*/
Settings getSettings(T txn, String namespace) throws DbException;
/**
* Returns all transport keys for the given transport.
* <p>
* Locking: read.
*/
Map<ContactId, TransportKeys> getTransportKeys(T txn, TransportId t)
throws DbException;
/**
* Returns the maximum latencies in milliseconds of all transports.
* <p>
* Locking: read.
*/
Map<TransportId, Integer> getTransportLatencies(T txn) throws DbException;
/**
* Returns the IDs of all contacts to which the given group is visible.
* <p>
* Locking: read.
*/
Collection<ContactId> getVisibility(T txn, GroupId g) throws DbException;
/**
* Increments the outgoing stream counter for the given contact and
* transport in the given rotation period.
* <p>
* Locking: write.
*/
void incrementStreamCounter(T txn, ContactId c, TransportId t,
long rotationPeriod) throws DbException;
@@ -445,8 +345,6 @@ interface Database<T> {
/**
* Marks the given messages as not needing to be acknowledged to the
* given contact.
* <p>
* Locking: write.
*/
void lowerAckFlag(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
@@ -454,8 +352,6 @@ interface Database<T> {
/**
* Marks the given messages as not having been requested by the given
* contact.
* <p>
* Locking: write.
*/
void lowerRequestedFlag(T txn, ContactId c, Collection<MessageId> requested)
throws DbException;
@@ -463,8 +359,6 @@ interface Database<T> {
/*
* Merges the given metadata with the existing metadata for the given
* group.
* <p>
* Locking: write.
*/
void mergeGroupMetadata(T txn, GroupId g, Metadata meta)
throws DbException;
@@ -472,8 +366,6 @@ interface Database<T> {
/*
* Merges the given metadata with the existing metadata for the given
* message.
* <p>
* Locking: write.
*/
void mergeMessageMetadata(T txn, MessageId m, Metadata meta)
throws DbException;
@@ -481,65 +373,47 @@ interface Database<T> {
/**
* Merges the given settings with the existing settings in the given
* namespace.
* <p>
* Locking: write.
*/
void mergeSettings(T txn, Settings s, String namespace) throws DbException;
/**
* Marks a message as needing to be acknowledged to the given contact.
* <p>
* Locking: write.
*/
void raiseAckFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks a message as having been requested by the given contact.
* <p>
* Locking: write.
*/
void raiseRequestedFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks a message as having been seen by the given contact.
* <p>
* Locking: write.
*/
void raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Removes a contact from the database.
* <p>
* Locking: write.
*/
void removeContact(T txn, ContactId c) throws DbException;
/**
* Removes a group (and all associated state) from the database.
* <p>
* Locking: write.
*/
void removeGroup(T txn, GroupId g) throws DbException;
/**
* Removes a local pseudonym (and all associated state) from the database.
* <p>
* Locking: write.
*/
void removeLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Removes a message (and all associated state) from the database.
* <p>
* Locking: write.
*/
void removeMessage(T txn, MessageId m) throws DbException;
/**
* Removes an offered message that was offered by the given contact, or
* returns false if there is no such message.
* <p>
* Locking: write.
*/
boolean removeOfferedMessage(T txn, ContactId c, MessageId m)
throws DbException;
@@ -547,70 +421,52 @@ interface Database<T> {
/**
* Removes the given offered messages that were offered by the given
* contact.
* <p>
* Locking: write.
*/
void removeOfferedMessages(T txn, ContactId c,
Collection<MessageId> requested) throws DbException;
/**
* Removes a transport (and all associated state) from the database.
* <p>
* Locking: write.
*/
void removeTransport(T txn, TransportId t) throws DbException;
/**
* Makes a group invisible to the given contact.
* <p>
* Locking: write.
*/
void removeVisibility(T txn, ContactId c, GroupId g) throws DbException;
/**
* Resets the transmission count and expiry time of the given message with
* respect to the given contact.
* <p>
* Locking: write.
*/
void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException;
/**
* Sets the status of the given contact.
* <p>
* Locking: write.
*/
void setContactStatus(T txn, ContactId c, StorageStatus s)
throws DbException;
/**
* Sets the status of the given local pseudonym.
* <p>
* Locking: write.
*/
void setLocalAuthorStatus(T txn, AuthorId a, StorageStatus s)
throws DbException;
/**
* Marks the given message as shared or unshared.
* <p>
* Locking: write.
*/
void setMessageShared(T txn, MessageId m, boolean shared)
throws DbException;
/**
* Marks the given message as valid or invalid.
* <p>
* Locking: write.
*/
void setMessageValid(T txn, MessageId m, boolean valid) throws DbException;
/**
* Sets the reordering window for the given contact and transport in the
* given rotation period.
* <p>
* Locking: write.
*/
void setReorderingWindow(T txn, ContactId c, TransportId t,
long rotationPeriod, long base, byte[] bitmap) throws DbException;
@@ -619,16 +475,12 @@ interface Database<T> {
* Updates the transmission count and expiry time of the given message
* with respect to the given contact, using the latency of the transport
* over which it was sent.
* <p>
* Locking: write.
*/
void updateExpiryTime(T txn, ContactId c, MessageId m, int maxLatency)
throws DbException;
/**
* Stores the given transport keys, deleting any keys they have replaced.
* <p>
* Locking: write.
*/
void updateTransportKeys(T txn, Map<ContactId, TransportKeys> keys)
throws DbException;

View File

@@ -55,7 +55,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -79,12 +79,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
private final Database<T> db;
private final EventBus eventBus;
private final ShutdownManager shutdown;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ReentrantReadWriteLock lock =
new ReentrantReadWriteLock(true);
private boolean open = false; // Locking: lock.writeLock
private int shutdownHandle = -1; // Locking: lock.writeLock
private volatile int shutdownHandle = -1;
@Inject
DatabaseComponentImpl(Database<T> db, EventBus eventBus,
@@ -97,9 +94,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public boolean open() throws DbException {
Runnable shutdownHook = new Runnable() {
public void run() {
lock.writeLock().lock();
try {
shutdownHandle = -1;
close();
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
@@ -107,40 +102,22 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
} catch (IOException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
}
};
lock.writeLock().lock();
try {
if (open) throw new IllegalStateException();
open = true;
boolean reopened = db.open();
shutdownHandle = shutdown.addShutdownHook(shutdownHook);
return reopened;
} finally {
lock.writeLock().unlock();
}
}
public void close() throws DbException, IOException {
lock.writeLock().lock();
try {
if (!open) return;
open = false;
if (shutdownHandle != -1)
if (closed.getAndSet(true)) return;
shutdown.removeShutdownHook(shutdownHandle);
db.close();
} finally {
lock.writeLock().unlock();
}
}
public ContactId addContact(Author remote, AuthorId local)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsLocalAuthor(txn, local))
@@ -154,15 +131,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void addGroup(Group g) throws DbException {
boolean added = false;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsGroup(txn, g.getId())) {
@@ -174,35 +146,24 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (added) eventBus.broadcast(new GroupAddedEvent(g));
}
public void addLocalAuthor(LocalAuthor a) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsLocalAuthor(txn, a.getId())) {
if (!db.containsLocalAuthor(txn, a.getId()))
db.addLocalAuthor(txn, a);
}
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void addLocalMessage(Message m, ClientId c, Metadata meta,
boolean shared) throws DbException {
boolean added = false;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsGroup(txn, m.getGroupId()))
@@ -217,9 +178,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (added) {
eventBus.broadcast(new MessageAddedEvent(m, null));
eventBus.broadcast(new MessageValidatedEvent(m, c, true, true));
@@ -229,8 +187,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
/**
* Stores a message and initialises its status with respect to each contact.
* <p>
* Locking: write.
*
* @param sender null for a locally generated message.
*/
private void addMessage(T txn, Message m, Validity validity, boolean shared,
@@ -253,8 +210,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void addTransport(TransportId t, int maxLatency) throws DbException {
boolean added = false;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsTransport(txn, t)) {
@@ -266,16 +221,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (added) eventBus.broadcast(new TransportAddedEvent(t, maxLatency));
}
public void addTransportKeys(ContactId c, TransportKeys k)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -288,9 +238,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void deleteMessage(MessageId m) throws DbException {
@@ -329,8 +276,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public Ack generateAck(ContactId c, int maxMessages) throws DbException {
Collection<MessageId> ids;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -342,9 +287,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (ids.isEmpty()) return null;
return new Ack(ids);
}
@@ -353,8 +295,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
int maxLatency) throws DbException {
Collection<MessageId> ids;
List<byte[]> messages = new ArrayList<byte[]>();
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -370,9 +310,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (messages.isEmpty()) return null;
if (!ids.isEmpty()) eventBus.broadcast(new MessagesSentEvent(c, ids));
return Collections.unmodifiableList(messages);
@@ -381,23 +318,17 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public Offer generateOffer(ContactId c, int maxMessages, int maxLatency)
throws DbException {
Collection<MessageId> ids;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
ids = db.getMessagesToOffer(txn, c, maxMessages);
for (MessageId m : ids)
db.updateExpiryTime(txn, c, m, maxLatency);
for (MessageId m : ids) db.updateExpiryTime(txn, c, m, maxLatency);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (ids.isEmpty()) return null;
return new Offer(ids);
}
@@ -405,8 +336,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public Request generateRequest(ContactId c, int maxMessages)
throws DbException {
Collection<MessageId> ids;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -418,9 +347,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (ids.isEmpty()) return null;
return new Request(ids);
}
@@ -429,8 +355,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
int maxLatency) throws DbException {
Collection<MessageId> ids;
List<byte[]> messages = new ArrayList<byte[]>();
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -446,17 +370,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (messages.isEmpty()) return null;
if (!ids.isEmpty()) eventBus.broadcast(new MessagesSentEvent(c, ids));
return Collections.unmodifiableList(messages);
}
public Contact getContact(ContactId c) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -468,14 +387,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Collection<Contact> getContacts() throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Collection<Contact> contacts = db.getContacts(txn);
@@ -485,14 +399,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Collection<ContactId> getContacts(AuthorId a) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsLocalAuthor(txn, a))
@@ -504,14 +413,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public DeviceId getDeviceId() throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
DeviceId id = db.getDeviceId(txn);
@@ -521,14 +425,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Group getGroup(GroupId g) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsGroup(txn, g))
@@ -540,14 +439,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Metadata getGroupMetadata(GroupId g) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsGroup(txn, g))
@@ -559,14 +453,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Collection<Group> getGroups(ClientId c) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Collection<Group> groups = db.getGroups(txn, c);
@@ -576,14 +465,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public LocalAuthor getLocalAuthor(AuthorId a) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsLocalAuthor(txn, a))
@@ -595,14 +479,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Collection<LocalAuthor> getLocalAuthors() throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Collection<LocalAuthor> authors = db.getLocalAuthors(txn);
@@ -612,15 +491,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Collection<MessageId> getMessagesToValidate(ClientId c)
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Collection<MessageId> ids = db.getMessagesToValidate(txn, c);
@@ -630,14 +504,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public byte[] getRawMessage(MessageId m) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsMessage(txn, m))
@@ -649,35 +518,24 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Map<MessageId, Metadata> getMessageMetadata(GroupId g)
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsGroup(txn, g))
throw new NoSuchGroupException();
Map<MessageId, Metadata> metadata =
db.getMessageMetadata(txn, g);
Map<MessageId, Metadata> metadata = db.getMessageMetadata(txn, g);
db.commitTransaction(txn);
return metadata;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Metadata getMessageMetadata(MessageId m) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsMessage(txn, m))
@@ -689,38 +547,27 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Collection<MessageStatus> getMessageStatus(ContactId c, GroupId g)
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
if (!db.containsGroup(txn, g))
throw new NoSuchGroupException();
Collection<MessageStatus> statuses =
db.getMessageStatus(txn, c, g);
Collection<MessageStatus> statuses = db.getMessageStatus(txn, c, g);
db.commitTransaction(txn);
return statuses;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public MessageStatus getMessageStatus(ContactId c, MessageId m)
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -734,14 +581,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Settings getSettings(String namespace) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Settings s = db.getSettings(txn, namespace);
@@ -751,54 +593,37 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Map<ContactId, TransportKeys> getTransportKeys(TransportId t)
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsTransport(txn, t))
throw new NoSuchTransportException();
Map<ContactId, TransportKeys> keys =
db.getTransportKeys(txn, t);
Map<ContactId, TransportKeys> keys = db.getTransportKeys(txn, t);
db.commitTransaction(txn);
return keys;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Map<TransportId, Integer> getTransportLatencies()
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Map<TransportId, Integer> latencies =
db.getTransportLatencies(txn);
Map<TransportId, Integer> latencies = db.getTransportLatencies(txn);
db.commitTransaction(txn);
return latencies;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Collection<ContactId> getVisibility(GroupId g) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsGroup(txn, g))
@@ -810,15 +635,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public void incrementStreamCounter(ContactId c, TransportId t,
long rotationPeriod) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -831,15 +651,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public boolean isVisibleToContact(ContactId c, GroupId g)
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -853,15 +668,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public void mergeGroupMetadata(GroupId g, Metadata meta)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsGroup(txn, g))
@@ -872,15 +682,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void mergeMessageMetadata(MessageId m, Metadata meta)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsMessage(txn, m))
@@ -891,15 +696,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void mergeSettings(Settings s, String namespace) throws DbException {
boolean changed = false;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
Settings old = db.getSettings(txn, namespace);
@@ -915,16 +715,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (changed) eventBus.broadcast(new SettingsUpdatedEvent(namespace));
}
public void receiveAck(ContactId c, Ack a) throws DbException {
Collection<MessageId> acked = new ArrayList<MessageId>();
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -940,16 +735,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
eventBus.broadcast(new MessagesAckedEvent(c, acked));
}
public void receiveMessage(ContactId c, Message m) throws DbException {
boolean duplicate, visible;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -965,9 +755,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (visible) {
if (!duplicate) eventBus.broadcast(new MessageAddedEvent(m, c));
eventBus.broadcast(new MessageToAckEvent(c));
@@ -976,8 +763,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void receiveOffer(ContactId c, Offer o) throws DbException {
boolean ack = false, request = false;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -999,17 +784,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (ack) eventBus.broadcast(new MessageToAckEvent(c));
if (request) eventBus.broadcast(new MessageToRequestEvent(c));
}
public void receiveRequest(ContactId c, Request r) throws DbException {
boolean requested = false;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -1026,15 +806,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (requested) eventBus.broadcast(new MessageRequestedEvent(c));
}
public void removeContact(ContactId c) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -1045,15 +820,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void removeGroup(Group g) throws DbException {
Collection<ContactId> affected;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
GroupId id = g.getId();
@@ -1066,16 +836,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
eventBus.broadcast(new GroupRemovedEvent(g));
eventBus.broadcast(new GroupVisibilityUpdatedEvent(affected));
}
public void removeLocalAuthor(AuthorId a) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsLocalAuthor(txn, a))
@@ -1086,14 +851,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void removeTransport(TransportId t) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsTransport(txn, t))
@@ -1104,16 +864,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
eventBus.broadcast(new TransportRemovedEvent(t));
}
public void setContactStatus(ContactId c, StorageStatus s)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -1124,15 +879,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void setLocalAuthorStatus(AuthorId a, StorageStatus s)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsLocalAuthor(txn, a))
@@ -1143,15 +893,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void setMessageShared(Message m, boolean shared)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsMessage(txn, m.getId()))
@@ -1162,16 +907,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (shared) eventBus.broadcast(new MessageSharedEvent(m));
}
public void setMessageValid(Message m, ClientId c, boolean valid)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsMessage(txn, m.getId()))
@@ -1182,16 +922,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
eventBus.broadcast(new MessageValidatedEvent(m, c, false, valid));
}
public void setReorderingWindow(ContactId c, TransportId t,
long rotationPeriod, long base, byte[] bitmap) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -1204,16 +939,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
Collection<ContactId> affected = new ArrayList<ContactId>();
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsGroup(txn, g))
@@ -1239,9 +969,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (!affected.isEmpty())
eventBus.broadcast(new GroupVisibilityUpdatedEvent(affected));
}
@@ -1249,8 +976,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void setVisibleToContact(ContactId c, GroupId g, boolean visible)
throws DbException {
boolean wasVisible = false;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
@@ -1265,9 +990,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (visible != wasVisible) {
eventBus.broadcast(new GroupVisibilityUpdatedEvent(
Collections.singletonList(c)));
@@ -1276,8 +998,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void updateTransportKeys(Map<ContactId, TransportKeys> keys)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
Map<ContactId, TransportKeys> filtered =
@@ -1296,8 +1016,5 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
}

View File

@@ -41,7 +41,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -228,8 +227,6 @@ abstract class JdbcDatabase implements Database<Connection> {
private final LinkedList<Connection> connections =
new LinkedList<Connection>(); // Locking: connectionsLock
private final AtomicInteger transactionCount = new AtomicInteger(0);
private int openConnections = 0; // Locking: connectionsLock
private boolean closed = false; // Locking: connectionsLock
@@ -369,7 +366,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} catch (SQLException e) {
throw new DbException(e);
}
transactionCount.incrementAndGet();
return txn;
}
@@ -418,14 +414,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public int getTransactionCount() {
return transactionCount.get();
}
public void resetTransactionCount() {
transactionCount.set(0);
}
protected void closeAllConnections() throws SQLException {
boolean interrupted = false;
connectionsLock.lock();