Use a lock to ensure transaction isolation. #272

This commit is contained in:
akwizgran
2016-03-24 17:18:54 +00:00
parent 9714713d73
commit 1855dbbd2d
22 changed files with 248 additions and 189 deletions

View File

@@ -59,6 +59,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -78,6 +80,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
private final EventBus eventBus;
private final ShutdownManager shutdown;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private volatile int shutdownHandle = -1;
@@ -115,18 +118,33 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.close();
}
public Transaction startTransaction() throws DbException {
return new Transaction(db.startTransaction());
public Transaction startTransaction(boolean readOnly) throws DbException {
if (readOnly) lock.readLock().lock();
else lock.writeLock().lock();
try {
return new Transaction(db.startTransaction(), readOnly);
} catch (DbException e) {
if (readOnly) lock.readLock().unlock();
else lock.writeLock().unlock();
throw e;
} catch (RuntimeException e) {
if (readOnly) lock.readLock().unlock();
else lock.writeLock().unlock();
throw e;
}
}
public void endTransaction(Transaction transaction) throws DbException {
T txn = txnClass.cast(transaction.unbox());
if (transaction.isComplete()) {
db.commitTransaction(txn);
for (Event e : transaction.getEvents()) eventBus.broadcast(e);
} else {
db.abortTransaction(txn);
try {
T txn = txnClass.cast(transaction.unbox());
if (transaction.isComplete()) db.commitTransaction(txn);
else db.abortTransaction(txn);
} finally {
if (transaction.isReadOnly()) lock.readLock().unlock();
else lock.writeLock().unlock();
}
if (transaction.isComplete())
for (Event e : transaction.getEvents()) eventBus.broadcast(e);
}
private T unbox(Transaction transaction) {
@@ -136,6 +154,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public ContactId addContact(Transaction transaction, Author remote,
AuthorId local, boolean active) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsLocalAuthor(txn, local))
throw new NoSuchLocalAuthorException();
@@ -148,6 +167,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
public void addGroup(Transaction transaction, Group g) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsGroup(txn, g.getId())) {
db.addGroup(txn, g);
@@ -157,6 +177,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void addLocalAuthor(Transaction transaction, LocalAuthor a)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsLocalAuthor(txn, a.getId())) {
db.addLocalAuthor(txn, a);
@@ -166,6 +187,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void addLocalMessage(Transaction transaction, Message m, ClientId c,
Metadata meta, boolean shared) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsGroup(txn, m.getGroupId()))
throw new NoSuchGroupException();
@@ -189,6 +211,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void addTransport(Transaction transaction, TransportId t,
int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsTransport(txn, t))
db.addTransport(txn, t, maxLatency);
@@ -196,6 +219,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void addTransportKeys(Transaction transaction, ContactId c,
TransportKeys k) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -206,6 +230,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void deleteMessage(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
@@ -214,6 +239,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void deleteMessageMetadata(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
@@ -222,6 +248,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public Ack generateAck(Transaction transaction, ContactId c,
int maxMessages) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -233,6 +260,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public Collection<byte[]> generateBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -250,6 +278,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public Offer generateOffer(Transaction transaction, ContactId c,
int maxMessages, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -261,6 +290,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public Request generateRequest(Transaction transaction, ContactId c,
int maxMessages) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -273,6 +303,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public Collection<byte[]> generateRequestedBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -418,6 +449,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void incrementStreamCounter(Transaction transaction, ContactId c,
TransportId t, long rotationPeriod) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -438,6 +470,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void mergeGroupMetadata(Transaction transaction, GroupId g,
Metadata meta) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsGroup(txn, g))
throw new NoSuchGroupException();
@@ -446,6 +479,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void mergeMessageMetadata(Transaction transaction, MessageId m,
Metadata meta) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
@@ -454,6 +488,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void mergeSettings(Transaction transaction, Settings s,
String namespace) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
Settings old = db.getSettings(txn, namespace);
Settings merged = new Settings();
@@ -467,6 +502,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void receiveAck(Transaction transaction, ContactId c, Ack a)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -482,6 +518,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void receiveMessage(Transaction transaction, ContactId c, Message m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -497,6 +534,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void receiveOffer(Transaction transaction, ContactId c, Offer o)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -519,6 +557,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void receiveRequest(Transaction transaction, ContactId c, Request r)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -535,6 +574,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void removeContact(Transaction transaction, ContactId c)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -544,6 +584,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void removeGroup(Transaction transaction, Group g)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
GroupId id = g.getId();
if (!db.containsGroup(txn, id))
@@ -556,6 +597,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void removeLocalAuthor(Transaction transaction, AuthorId a)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsLocalAuthor(txn, a))
throw new NoSuchLocalAuthorException();
@@ -565,6 +607,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void removeTransport(Transaction transaction, TransportId t)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsTransport(txn, t))
throw new NoSuchTransportException();
@@ -573,6 +616,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void setContactActive(Transaction transaction, ContactId c,
boolean active) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -582,6 +626,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void setMessageShared(Transaction transaction, Message m,
boolean shared) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m.getId()))
throw new NoSuchMessageException();
@@ -591,6 +636,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void setMessageValid(Transaction transaction, Message m, ClientId c,
boolean valid) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m.getId()))
throw new NoSuchMessageException();
@@ -601,6 +647,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void setReorderingWindow(Transaction transaction, ContactId c,
TransportId t, long rotationPeriod, long base, byte[] bitmap)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -611,6 +658,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void setVisibleToContact(Transaction transaction, ContactId c,
GroupId g, boolean visible) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -636,6 +684,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void updateTransportKeys(Transaction transaction,
Map<ContactId, TransportKeys> keys) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
Map<ContactId, TransportKeys> filtered =
new HashMap<ContactId, TransportKeys>();