Use transactional database API in Bramble.

This commit is contained in:
akwizgran
2018-10-26 09:52:05 +01:00
parent 23e9b119d1
commit e846a13f50
26 changed files with 577 additions and 1309 deletions

View File

@@ -82,13 +82,7 @@ class ClientHelperImpl implements ClientHelper {
@Override
public void addLocalMessage(Message m, BdfDictionary metadata,
boolean shared) throws DbException, FormatException {
Transaction txn = db.startTransaction(false);
try {
addLocalMessage(txn, m, metadata, shared);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false, txn -> addLocalMessage(txn, m, metadata, shared));
}
@Override
@@ -113,15 +107,7 @@ class ClientHelperImpl implements ClientHelper {
@Override
public Message getMessage(MessageId m) throws DbException {
Message message;
Transaction txn = db.startTransaction(true);
try {
message = getMessage(txn, m);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return message;
return db.transactionWithResult(true, txn -> getMessage(txn, m));
}
@Override
@@ -132,15 +118,7 @@ class ClientHelperImpl implements ClientHelper {
@Override
public BdfList getMessageAsList(MessageId m) throws DbException,
FormatException {
BdfList list;
Transaction txn = db.startTransaction(true);
try {
list = getMessageAsList(txn, m);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return list;
return db.transactionWithResult(true, txn -> getMessageAsList(txn, m));
}
@Override
@@ -152,15 +130,8 @@ class ClientHelperImpl implements ClientHelper {
@Override
public BdfDictionary getGroupMetadataAsDictionary(GroupId g)
throws DbException, FormatException {
BdfDictionary dictionary;
Transaction txn = db.startTransaction(true);
try {
dictionary = getGroupMetadataAsDictionary(txn, g);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return dictionary;
return db.transactionWithResult(true, txn ->
getGroupMetadataAsDictionary(txn, g));
}
@Override
@@ -173,15 +144,8 @@ class ClientHelperImpl implements ClientHelper {
@Override
public BdfDictionary getMessageMetadataAsDictionary(MessageId m)
throws DbException, FormatException {
BdfDictionary dictionary;
Transaction txn = db.startTransaction(true);
try {
dictionary = getMessageMetadataAsDictionary(txn, m);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return dictionary;
return db.transactionWithResult(true,
txn -> getMessageMetadataAsDictionary(txn, m));
}
@Override
@@ -194,15 +158,8 @@ class ClientHelperImpl implements ClientHelper {
@Override
public Map<MessageId, BdfDictionary> getMessageMetadataAsDictionary(
GroupId g) throws DbException, FormatException {
Map<MessageId, BdfDictionary> map;
Transaction txn = db.startTransaction(true);
try {
map = getMessageMetadataAsDictionary(txn, g);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return map;
return db.transactionWithResult(true,
txn -> getMessageMetadataAsDictionary(txn, g));
}
@Override
@@ -219,15 +176,8 @@ class ClientHelperImpl implements ClientHelper {
public Map<MessageId, BdfDictionary> getMessageMetadataAsDictionary(
GroupId g, BdfDictionary query) throws DbException,
FormatException {
Map<MessageId, BdfDictionary> map;
Transaction txn = db.startTransaction(true);
try {
map = getMessageMetadataAsDictionary(txn, g, query);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return map;
return db.transactionWithResult(true,
txn -> getMessageMetadataAsDictionary(txn, g, query));
}
@Override
@@ -245,13 +195,7 @@ class ClientHelperImpl implements ClientHelper {
@Override
public void mergeGroupMetadata(GroupId g, BdfDictionary metadata)
throws DbException, FormatException {
Transaction txn = db.startTransaction(false);
try {
mergeGroupMetadata(txn, g, metadata);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false, txn -> mergeGroupMetadata(txn, g, metadata));
}
@Override
@@ -263,13 +207,7 @@ class ClientHelperImpl implements ClientHelper {
@Override
public void mergeMessageMetadata(MessageId m, BdfDictionary metadata)
throws DbException, FormatException {
Transaction txn = db.startTransaction(false);
try {
mergeMessageMetadata(txn, m, metadata);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false, txn -> mergeMessageMetadata(txn, m, metadata));
}
@Override

View File

@@ -13,7 +13,6 @@ import org.briarproject.bramble.api.data.BdfList;
import org.briarproject.bramble.api.db.ContactExistsException;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.identity.Author;
import org.briarproject.bramble.api.identity.LocalAuthor;
import org.briarproject.bramble.api.nullsafety.MethodsNotNullByDefault;
@@ -158,7 +157,8 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask {
streamWriterFactory.createContactExchangeStreamWriter(out,
alice ? aliceHeaderKey : bobHeaderKey);
RecordWriter recordWriter =
recordWriterFactory.createRecordWriter(streamWriter.getOutputStream());
recordWriterFactory
.createRecordWriter(streamWriter.getOutputStream());
// Derive the nonces to be signed
byte[] aliceNonce = crypto.mac(ALICE_NONCE_LABEL, masterSecret,
@@ -287,19 +287,14 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask {
private ContactId addContact(Author remoteAuthor, long timestamp,
Map<TransportId, TransportProperties> remoteProperties)
throws DbException {
ContactId contactId;
Transaction txn = db.startTransaction(false);
try {
contactId = contactManager.addContact(txn, remoteAuthor,
return db.transactionWithResult(false, txn -> {
ContactId contactId = contactManager.addContact(txn, remoteAuthor,
localAuthor.getId(), masterSecret, timestamp, alice,
true, true);
transportPropertyManager.addRemoteProperties(txn, contactId,
remoteProperties);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return contactId;
return contactId;
});
}
private void tryToClose(DuplexTransportConnection conn) {

View File

@@ -79,42 +79,21 @@ class ContactManagerImpl implements ContactManager {
public ContactId addContact(Author remote, AuthorId local, SecretKey master,
long timestamp, boolean alice, boolean verified, boolean active)
throws DbException {
ContactId c;
Transaction txn = db.startTransaction(false);
try {
c = addContact(txn, remote, local, master, timestamp, alice,
verified, active);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return c;
return db.transactionWithResult(false,
txn -> addContact(txn, remote, local, master, timestamp, alice,
verified, active));
}
@Override
public Contact getContact(ContactId c) throws DbException {
Contact contact;
Transaction txn = db.startTransaction(true);
try {
contact = db.getContact(txn, c);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return contact;
return db.transactionWithResult(true, txn -> db.getContact(txn, c));
}
@Override
public Contact getContact(AuthorId remoteAuthorId, AuthorId localAuthorId)
throws DbException {
Transaction txn = db.startTransaction(true);
try {
Contact c = getContact(txn, remoteAuthorId, localAuthorId);
db.commitTransaction(txn);
return c;
} finally {
db.endTransaction(txn);
}
return db.transactionWithResult(true,
txn -> getContact(txn, remoteAuthorId, localAuthorId));
}
@Override
@@ -132,14 +111,8 @@ class ContactManagerImpl implements ContactManager {
@Override
public Collection<Contact> getActiveContacts() throws DbException {
Collection<Contact> contacts;
Transaction txn = db.startTransaction(true);
try {
contacts = db.getContacts(txn);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
Collection<Contact> contacts =
db.transactionWithResult(true, db::getContacts);
List<Contact> active = new ArrayList<>(contacts.size());
for (Contact c : contacts) if (c.isActive()) active.add(c);
return active;
@@ -147,13 +120,7 @@ class ContactManagerImpl implements ContactManager {
@Override
public void removeContact(ContactId c) throws DbException {
Transaction txn = db.startTransaction(false);
try {
removeContact(txn, c);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false, txn -> removeContact(txn, c));
}
@Override
@@ -188,15 +155,8 @@ class ContactManagerImpl implements ContactManager {
@Override
public boolean contactExists(AuthorId remoteAuthorId,
AuthorId localAuthorId) throws DbException {
boolean exists;
Transaction txn = db.startTransaction(true);
try {
exists = contactExists(txn, remoteAuthorId, localAuthorId);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return exists;
return db.transactionWithResult(true,
txn -> contactExists(txn, remoteAuthorId, localAuthorId));
}
@Override

View File

@@ -67,27 +67,16 @@ class IdentityManagerImpl implements IdentityManager {
LOG.info("No local author to store");
return;
}
Transaction txn = db.startTransaction(false);
try {
db.addLocalAuthor(txn, cached);
db.commitTransaction(txn);
LOG.info("Local author stored");
} finally {
db.endTransaction(txn);
}
db.transaction(false, txn -> db.addLocalAuthor(txn, cached));
LOG.info("Local author stored");
}
@Override
public LocalAuthor getLocalAuthor() throws DbException {
if (cachedAuthor == null) {
Transaction txn = db.startTransaction(true);
try {
cachedAuthor = loadLocalAuthor(txn);
LOG.info("Local author loaded");
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
cachedAuthor =
db.transactionWithResult(true, this::loadLocalAuthor);
LOG.info("Local author loaded");
}
LocalAuthor cached = cachedAuthor;
if (cached == null) throw new AssertionError();

View File

@@ -6,7 +6,6 @@ import org.briarproject.bramble.api.db.DataTooOldException;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.MigrationListener;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.identity.IdentityManager;
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
@@ -115,20 +114,16 @@ class LifecycleManagerImpl implements LifecycleManager, MigrationListener {
dbLatch.countDown();
eventBus.broadcast(new LifecycleEvent(STARTING_SERVICES));
Transaction txn = db.startTransaction(false);
try {
db.transaction(false, txn -> {
for (Client c : clients) {
start = now();
long start1 = now();
c.createLocalState(txn);
if (LOG.isLoggable(FINE)) {
logDuration(LOG, "Starting client "
+ c.getClass().getSimpleName(), start);
+ c.getClass().getSimpleName(), start1);
}
}
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
});
for (Service s : services) {
start = now();
s.startService();

View File

@@ -142,15 +142,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
@Override
public Map<TransportId, TransportProperties> getLocalProperties()
throws DbException {
Map<TransportId, TransportProperties> local;
Transaction txn = db.startTransaction(true);
try {
local = getLocalProperties(txn);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return local;
return db.transactionWithResult(true, this::getLocalProperties);
}
@Override
@@ -176,9 +168,8 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
public TransportProperties getLocalProperties(TransportId t)
throws DbException {
try {
TransportProperties p = null;
Transaction txn = db.startTransaction(true);
try {
return db.transactionWithResult(true, txn -> {
TransportProperties p = null;
// Find the latest local update
LatestUpdate latest = findLatest(txn, localGroup.getId(), t,
true);
@@ -188,11 +179,8 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
latest.messageId);
p = parseProperties(message);
}
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return p == null ? new TransportProperties() : p;
return p == null ? new TransportProperties() : p;
});
} catch (FormatException e) {
throw new DbException(e);
}
@@ -201,16 +189,12 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
@Override
public Map<ContactId, TransportProperties> getRemoteProperties(
TransportId t) throws DbException {
Map<ContactId, TransportProperties> remote = new HashMap<>();
Transaction txn = db.startTransaction(true);
try {
return db.transactionWithResult(true, txn -> {
Map<ContactId, TransportProperties> remote = new HashMap<>();
for (Contact c : db.getContacts(txn))
remote.put(c.getId(), getRemoteProperties(txn, c, t));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return remote;
return remote;
});
}
private TransportProperties getRemoteProperties(Transaction txn, Contact c,
@@ -234,23 +218,15 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
@Override
public TransportProperties getRemoteProperties(ContactId c, TransportId t)
throws DbException {
TransportProperties p;
Transaction txn = db.startTransaction(true);
try {
p = getRemoteProperties(txn, db.getContact(txn, c), t);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return p;
return db.transactionWithResult(true,
txn -> getRemoteProperties(txn, db.getContact(txn, c), t));
}
@Override
public void mergeLocalProperties(TransportId t, TransportProperties p)
throws DbException {
try {
Transaction txn = db.startTransaction(false);
try {
db.transaction(false, txn -> {
// Merge the new properties with any existing properties
TransportProperties merged;
boolean changed;
@@ -287,10 +263,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
db.removeMessage(txn, latest.messageId);
}
}
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
});
} catch (FormatException e) {
throw new DbException(e);
}

View File

@@ -23,15 +23,8 @@ class SettingsManagerImpl implements SettingsManager {
@Override
public Settings getSettings(String namespace) throws DbException {
Settings s;
Transaction txn = db.startTransaction(true);
try {
s = db.getSettings(txn, namespace);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return s;
return db.transactionWithResult(true,
txn -> db.getSettings(txn, namespace));
}
@Override
@@ -42,12 +35,6 @@ class SettingsManagerImpl implements SettingsManager {
@Override
public void mergeSettings(Settings s, String namespace) throws DbException {
Transaction txn = db.startTransaction(false);
try {
db.mergeSettings(txn, s, namespace);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false, txn -> db.mergeSettings(txn, s, namespace));
}
}

View File

@@ -1,11 +1,11 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.Maybe;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.event.ContactRemovedEvent;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener;
@@ -230,17 +230,12 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return;
if (!generateAckQueued.getAndSet(false)) throw new AssertionError();
try {
Ack a;
Transaction txn = db.startTransaction(false);
try {
a = db.generateAck(txn, contactId, MAX_MESSAGE_IDS);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
Maybe<Ack> a = db.transactionWithResult(false,
txn -> new Maybe<>(db.generateAck(txn, contactId,
MAX_MESSAGE_IDS)));
if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if (a != null) writerTasks.add(new WriteAck(a));
LOG.info("Generated ack: " + a.isPresent());
if (a.isPresent()) writerTasks.add(new WriteAck(a.get()));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
@@ -275,19 +270,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (!generateBatchQueued.getAndSet(false))
throw new AssertionError();
try {
Collection<Message> b;
Transaction txn = db.startTransaction(false);
try {
b = db.generateRequestedBatch(txn, contactId,
MAX_RECORD_PAYLOAD_BYTES, maxLatency);
Maybe<Collection<Message>>
b = db.transactionWithResult(false, txn -> {
Collection<Message> batch = db.generateRequestedBatch(txn,
contactId, MAX_RECORD_PAYLOAD_BYTES, maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return new Maybe<>(batch);
});
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b != null) writerTasks.add(new WriteBatch(b));
LOG.info("Generated batch: " + b.isPresent());
if (b.isPresent()) writerTasks.add(new WriteBatch(b.get()));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
@@ -322,19 +314,15 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (!generateOfferQueued.getAndSet(false))
throw new AssertionError();
try {
Offer o;
Transaction txn = db.startTransaction(false);
try {
o = db.generateOffer(txn, contactId, MAX_MESSAGE_IDS,
maxLatency);
Maybe<Offer> o = db.transactionWithResult(false, txn -> {
Offer offer = db.generateOffer(txn, contactId,
MAX_MESSAGE_IDS, maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return new Maybe<>(offer);
});
if (LOG.isLoggable(INFO))
LOG.info("Generated offer: " + (o != null));
if (o != null) writerTasks.add(new WriteOffer(o));
LOG.info("Generated offer: " + o.isPresent());
if (o.isPresent()) writerTasks.add(new WriteOffer(o.get()));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
@@ -369,17 +357,12 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (!generateRequestQueued.getAndSet(false))
throw new AssertionError();
try {
Request r;
Transaction txn = db.startTransaction(false);
try {
r = db.generateRequest(txn, contactId, MAX_MESSAGE_IDS);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
Maybe<Request> r = db.transactionWithResult(false,
txn -> new Maybe<>(db.generateRequest(txn, contactId,
MAX_MESSAGE_IDS)));
if (LOG.isLoggable(INFO))
LOG.info("Generated request: " + (r != null));
if (r != null) writerTasks.add(new WriteRequest(r));
LOG.info("Generated request: " + r.isPresent());
if (r.isPresent()) writerTasks.add(new WriteRequest(r.get()));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();

View File

@@ -6,7 +6,6 @@ import org.briarproject.bramble.api.contact.event.ContactRemovedEvent;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener;
@@ -120,13 +119,8 @@ class IncomingSession implements SyncSession, EventListener {
@Override
public void run() {
try {
Transaction txn = db.startTransaction(false);
try {
db.receiveAck(txn, contactId, ack);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false,
txn -> db.receiveAck(txn, contactId, ack));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
@@ -146,13 +140,8 @@ class IncomingSession implements SyncSession, EventListener {
@Override
public void run() {
try {
Transaction txn = db.startTransaction(false);
try {
db.receiveMessage(txn, contactId, message);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false,
txn -> db.receiveMessage(txn, contactId, message));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
@@ -172,13 +161,8 @@ class IncomingSession implements SyncSession, EventListener {
@Override
public void run() {
try {
Transaction txn = db.startTransaction(false);
try {
db.receiveOffer(txn, contactId, offer);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false,
txn -> db.receiveOffer(txn, contactId, offer));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
@@ -198,13 +182,8 @@ class IncomingSession implements SyncSession, EventListener {
@Override
public void run() {
try {
Transaction txn = db.startTransaction(false);
try {
db.receiveRequest(txn, contactId, request);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false,
txn -> db.receiveRequest(txn, contactId, request));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();

View File

@@ -1,11 +1,11 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.Maybe;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.event.ContactRemovedEvent;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener;
@@ -47,7 +47,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private static final Logger LOG =
Logger.getLogger(SimplexOutgoingSession.class.getName());
private static final ThrowingRunnable<IOException> CLOSE = () -> {};
private static final ThrowingRunnable<IOException> CLOSE = () -> {
};
private final DatabaseComponent db;
private final Executor dbExecutor;
@@ -128,18 +129,13 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
public void run() {
if (interrupted) return;
try {
Ack a;
Transaction txn = db.startTransaction(false);
try {
a = db.generateAck(txn, contactId, MAX_MESSAGE_IDS);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
Maybe<Ack> a = db.transactionWithResult(false,
txn -> new Maybe<>(db.generateAck(txn, contactId,
MAX_MESSAGE_IDS)));
if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if (a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteAck(a));
LOG.info("Generated ack: " + a.isPresent());
if (a.isPresent()) writerTasks.add(new WriteAck(a.get()));
else decrementOutstandingQueries();
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
@@ -172,19 +168,13 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
public void run() {
if (interrupted) return;
try {
Collection<Message> b;
Transaction txn = db.startTransaction(false);
try {
b = db.generateBatch(txn, contactId,
MAX_RECORD_PAYLOAD_BYTES, maxLatency);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
Maybe<Collection<Message>> b = db.transactionWithResult(false,
txn -> new Maybe<>(db.generateBatch(txn, contactId,
MAX_RECORD_PAYLOAD_BYTES, maxLatency)));
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b == null) decrementOutstandingQueries();
else writerTasks.add(new WriteBatch(b));
LOG.info("Generated batch: " + b.isPresent());
if (b.isPresent()) writerTasks.add(new WriteBatch(b.get()));
else decrementOutstandingQueries();
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();

View File

@@ -1,5 +1,6 @@
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.Pair;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.db.DbException;
@@ -97,14 +98,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
@DatabaseExecutor
private void validateOutstandingMessages() {
try {
Queue<MessageId> unvalidated = new LinkedList<>();
Transaction txn = db.startTransaction(true);
try {
unvalidated.addAll(db.getMessagesToValidate(txn));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
Queue<MessageId> unvalidated = new LinkedList<>(
db.transactionWithResult(true, db::getMessagesToValidate));
validateNextMessageAsync(unvalidated);
} catch (DbException e) {
logException(LOG, WARNING, e);
@@ -119,18 +114,14 @@ class ValidationManagerImpl implements ValidationManager, Service,
@DatabaseExecutor
private void validateNextMessage(Queue<MessageId> unvalidated) {
try {
Message m;
Group g;
Transaction txn = db.startTransaction(true);
try {
Pair<Message, Group> mg = db.transactionWithResult(true, txn -> {
MessageId id = unvalidated.poll();
m = db.getMessage(txn, id);
g = db.getGroup(txn, m.getGroupId());
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
validateMessageAsync(m, g);
if (id == null) throw new AssertionError();
Message m = db.getMessage(txn, id);
Group g = db.getGroup(txn, m.getGroupId());
return new Pair<>(m, g);
});
validateMessageAsync(mg.getFirst(), mg.getSecond());
validateNextMessageAsync(unvalidated);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before validation");
@@ -150,14 +141,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
@DatabaseExecutor
private void deliverOutstandingMessages() {
try {
Queue<MessageId> pending = new LinkedList<>();
Transaction txn = db.startTransaction(true);
try {
pending.addAll(db.getPendingMessages(txn));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
Queue<MessageId> pending = new LinkedList<>(
db.transactionWithResult(true, db::getPendingMessages));
deliverNextPendingMessageAsync(pending);
} catch (DbException e) {
logException(LOG, WARNING, e);
@@ -172,12 +157,12 @@ class ValidationManagerImpl implements ValidationManager, Service,
@DatabaseExecutor
private void deliverNextPendingMessage(Queue<MessageId> pending) {
try {
boolean anyInvalid = false, allDelivered = true;
Queue<MessageId> toShare = null;
Queue<MessageId> invalidate = null;
Transaction txn = db.startTransaction(false);
try {
Queue<MessageId> toShare = new LinkedList<>();
Queue<MessageId> invalidate = new LinkedList<>();
db.transaction(false, txn -> {
boolean anyInvalid = false, allDelivered = true;
MessageId id = pending.poll();
if (id == null) throw new AssertionError();
// Check if message is still pending
if (db.getMessageState(txn, id) == PENDING) {
// Check if dependencies are valid and delivered
@@ -189,7 +174,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
if (anyInvalid) {
invalidateMessage(txn, id);
invalidate = getDependentsToInvalidate(txn, id);
addDependentsToInvalidate(txn, id, invalidate);
} else if (allDelivered) {
Message m = db.getMessage(txn, id);
Group g = db.getGroup(txn, m.getGroupId());
@@ -200,22 +185,19 @@ class ValidationManagerImpl implements ValidationManager, Service,
DeliveryResult result =
deliverMessage(txn, m, c, majorVersion, meta);
if (result.valid) {
pending.addAll(getPendingDependents(txn, id));
addPendingDependents(txn, id, pending);
if (result.share) {
db.setMessageShared(txn, id);
toShare = new LinkedList<>(states.keySet());
toShare.addAll(states.keySet());
}
} else {
invalidate = getDependentsToInvalidate(txn, id);
addDependentsToInvalidate(txn, id, invalidate);
}
}
}
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
if (invalidate != null) invalidateNextMessageAsync(invalidate);
if (toShare != null) shareNextMessageAsync(toShare);
});
if (!invalidate.isEmpty()) invalidateNextMessageAsync(invalidate);
if (!toShare.isEmpty()) shareNextMessageAsync(toShare);
deliverNextPendingMessageAsync(pending);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before delivery");
@@ -264,12 +246,11 @@ class ValidationManagerImpl implements ValidationManager, Service,
MessageContext context) {
try {
MessageId id = m.getId();
boolean anyInvalid = false, allDelivered = true;
Queue<MessageId> invalidate = null;
Queue<MessageId> pending = null;
Queue<MessageId> toShare = null;
Transaction txn = db.startTransaction(false);
try {
Queue<MessageId> invalidate = new LinkedList<>();
Queue<MessageId> pending = new LinkedList<>();
Queue<MessageId> toShare = new LinkedList<>();
db.transaction(false, txn -> {
boolean anyInvalid = false, allDelivered = true;
// Check if message has any dependencies
Collection<MessageId> dependencies = context.getDependencies();
if (!dependencies.isEmpty()) {
@@ -285,7 +266,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
if (anyInvalid) {
if (db.getMessageState(txn, id) != INVALID) {
invalidateMessage(txn, id);
invalidate = getDependentsToInvalidate(txn, id);
addDependentsToInvalidate(txn, id, invalidate);
}
} else {
Metadata meta = context.getMetadata();
@@ -294,25 +275,22 @@ class ValidationManagerImpl implements ValidationManager, Service,
DeliveryResult result =
deliverMessage(txn, m, c, majorVersion, meta);
if (result.valid) {
pending = getPendingDependents(txn, id);
addPendingDependents(txn, id, pending);
if (result.share) {
db.setMessageShared(txn, id);
toShare = new LinkedList<>(dependencies);
toShare.addAll(dependencies);
}
} else {
invalidate = getDependentsToInvalidate(txn, id);
addDependentsToInvalidate(txn, id, invalidate);
}
} else {
db.setMessageState(txn, id, PENDING);
}
}
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
if (invalidate != null) invalidateNextMessageAsync(invalidate);
if (pending != null) deliverNextPendingMessageAsync(pending);
if (toShare != null) shareNextMessageAsync(toShare);
});
if (!invalidate.isEmpty()) invalidateNextMessageAsync(invalidate);
if (!pending.isEmpty()) deliverNextPendingMessageAsync(pending);
if (!toShare.isEmpty()) shareNextMessageAsync(toShare);
} catch (NoSuchMessageException e) {
LOG.info("Message removed during validation");
} catch (NoSuchGroupException e) {
@@ -342,14 +320,12 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
@DatabaseExecutor
private Queue<MessageId> getPendingDependents(Transaction txn, MessageId m)
throws DbException {
Queue<MessageId> pending = new LinkedList<>();
private void addPendingDependents(Transaction txn, MessageId m,
Queue<MessageId> pending) throws DbException {
Map<MessageId, State> states = db.getMessageDependents(txn, m);
for (Entry<MessageId, State> e : states.entrySet()) {
if (e.getValue() == PENDING) pending.add(e.getKey());
}
return pending;
}
private void shareOutstandingMessagesAsync() {
@@ -359,14 +335,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
@DatabaseExecutor
private void shareOutstandingMessages() {
try {
Queue<MessageId> toShare = new LinkedList<>();
Transaction txn = db.startTransaction(true);
try {
toShare.addAll(db.getMessagesToShare(txn));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
Queue<MessageId> toShare = new LinkedList<>(
db.transactionWithResult(true, db::getMessagesToShare));
shareNextMessageAsync(toShare);
} catch (DbException e) {
logException(LOG, WARNING, e);
@@ -387,15 +357,12 @@ class ValidationManagerImpl implements ValidationManager, Service,
@DatabaseExecutor
private void shareNextMessage(Queue<MessageId> toShare) {
try {
Transaction txn = db.startTransaction(false);
try {
db.transaction(false, txn -> {
MessageId id = toShare.poll();
if (id == null) throw new AssertionError();
db.setMessageShared(txn, id);
toShare.addAll(db.getMessageDependencies(txn, id).keySet());
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
});
shareNextMessageAsync(toShare);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before sharing");
@@ -416,17 +383,14 @@ class ValidationManagerImpl implements ValidationManager, Service,
@DatabaseExecutor
private void invalidateNextMessage(Queue<MessageId> invalidate) {
try {
Transaction txn = db.startTransaction(false);
try {
db.transaction(false, txn -> {
MessageId id = invalidate.poll();
if (id == null) throw new AssertionError();
if (db.getMessageState(txn, id) != INVALID) {
invalidateMessage(txn, id);
invalidate.addAll(getDependentsToInvalidate(txn, id));
addDependentsToInvalidate(txn, id, invalidate);
}
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
});
invalidateNextMessageAsync(invalidate);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before invalidation");
@@ -445,14 +409,12 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
@DatabaseExecutor
private Queue<MessageId> getDependentsToInvalidate(Transaction txn,
MessageId m) throws DbException {
Queue<MessageId> invalidate = new LinkedList<>();
private void addDependentsToInvalidate(Transaction txn,
MessageId m, Queue<MessageId> invalidate) throws DbException {
Map<MessageId, State> states = db.getMessageDependents(txn, m);
for (Entry<MessageId, State> e : states.entrySet()) {
if (e.getValue() != INVALID) invalidate.add(e.getKey());
}
return invalidate;
}
@Override
@@ -472,14 +434,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
@DatabaseExecutor
private void loadGroupAndValidate(Message m) {
try {
Group g;
Transaction txn = db.startTransaction(true);
try {
g = db.getGroup(txn, m.getGroupId());
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
Group g = db.transactionWithResult(true,
txn -> db.getGroup(txn, m.getGroupId()));
validateMessageAsync(m, g);
} catch (NoSuchGroupException e) {
LOG.info("Group removed before validation");

View File

@@ -72,8 +72,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
for (DuplexPluginFactory f : pluginConfig.getDuplexFactories())
transports.put(f.getId(), f.getMaxLatency());
try {
Transaction txn = db.startTransaction(false);
try {
db.transaction(false, txn -> {
for (Contact c : db.getContacts(txn))
if (c.isActive()) activeContacts.put(c.getId(), true);
for (Entry<TransportId, Integer> e : transports.entrySet())
@@ -85,10 +84,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
managers.put(e.getKey(), m);
m.start(txn);
}
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
});
} catch (DbException e) {
throw new ServiceException(e);
}
@@ -141,15 +137,8 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
if (LOG.isLoggable(INFO)) LOG.info("No key manager for " + t);
return null;
}
StreamContext ctx;
Transaction txn = db.startTransaction(false);
try {
ctx = m.getStreamContext(txn, c);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return ctx;
return db.transactionWithResult(false,
txn -> m.getStreamContext(txn, c));
}
@Override
@@ -160,15 +149,8 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
if (LOG.isLoggable(INFO)) LOG.info("No key manager for " + t);
return null;
}
StreamContext ctx;
Transaction txn = db.startTransaction(false);
try {
ctx = m.getStreamContext(txn, tag);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
return ctx;
return db.transactionWithResult(false,
txn -> m.getStreamContext(txn, tag));
}
@Override

View File

@@ -162,13 +162,7 @@ class TransportKeyManagerImpl implements TransportKeyManager {
private void rotateKeys() {
dbExecutor.execute(() -> {
try {
Transaction txn = db.startTransaction(false);
try {
rotateKeys(txn);
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
db.transaction(false, this::rotateKeys);
} catch (DbException e) {
logException(LOG, WARNING, e);
}

View File

@@ -123,16 +123,12 @@ class ClientVersioningManagerImpl implements ClientVersioningManager, Client,
List<ClientVersion> versions = new ArrayList<>(clients);
Collections.sort(versions);
try {
Transaction txn = db.startTransaction(false);
try {
db.transaction(false, txn -> {
if (updateClientVersions(txn, versions)) {
for (Contact c : db.getContacts(txn))
clientVersionsUpdated(txn, c, versions);
}
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
}
});
} catch (DbException e) {
throw new ServiceException(e);
}