mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-13 19:29:06 +01:00
Allow event executor tasks to be attached to transactions.
This commit is contained in:
@@ -7,11 +7,14 @@ import org.briarproject.bramble.api.contact.event.ContactRemovedEvent;
|
||||
import org.briarproject.bramble.api.contact.event.ContactStatusChangedEvent;
|
||||
import org.briarproject.bramble.api.contact.event.ContactVerifiedEvent;
|
||||
import org.briarproject.bramble.api.crypto.SecretKey;
|
||||
import org.briarproject.bramble.api.db.CommitAction;
|
||||
import org.briarproject.bramble.api.db.CommitAction.Visitor;
|
||||
import org.briarproject.bramble.api.db.ContactExistsException;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DbCallable;
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.db.DbRunnable;
|
||||
import org.briarproject.bramble.api.db.EventAction;
|
||||
import org.briarproject.bramble.api.db.Metadata;
|
||||
import org.briarproject.bramble.api.db.MigrationListener;
|
||||
import org.briarproject.bramble.api.db.NoSuchContactException;
|
||||
@@ -20,9 +23,10 @@ import org.briarproject.bramble.api.db.NoSuchLocalAuthorException;
|
||||
import org.briarproject.bramble.api.db.NoSuchMessageException;
|
||||
import org.briarproject.bramble.api.db.NoSuchTransportException;
|
||||
import org.briarproject.bramble.api.db.NullableDbCallable;
|
||||
import org.briarproject.bramble.api.db.TaskAction;
|
||||
import org.briarproject.bramble.api.db.Transaction;
|
||||
import org.briarproject.bramble.api.event.Event;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.event.EventExecutor;
|
||||
import org.briarproject.bramble.api.identity.Author;
|
||||
import org.briarproject.bramble.api.identity.AuthorId;
|
||||
import org.briarproject.bramble.api.identity.LocalAuthor;
|
||||
@@ -64,6 +68,7 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.logging.Logger;
|
||||
@@ -92,25 +97,29 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
private final Database<T> db;
|
||||
private final Class<T> txnClass;
|
||||
private final EventBus eventBus;
|
||||
private final ShutdownManager shutdown;
|
||||
private final Executor eventExecutor;
|
||||
private final ShutdownManager shutdownManager;
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private final ReentrantReadWriteLock lock =
|
||||
new ReentrantReadWriteLock(true);
|
||||
private final Visitor visitor = new CommitActionVisitor();
|
||||
|
||||
@Inject
|
||||
DatabaseComponentImpl(Database<T> db, Class<T> txnClass, EventBus eventBus,
|
||||
ShutdownManager shutdown) {
|
||||
@EventExecutor Executor eventExecutor,
|
||||
ShutdownManager shutdownManager) {
|
||||
this.db = db;
|
||||
this.txnClass = txnClass;
|
||||
this.eventBus = eventBus;
|
||||
this.shutdown = shutdown;
|
||||
this.eventExecutor = eventExecutor;
|
||||
this.shutdownManager = shutdownManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean open(SecretKey key, @Nullable MigrationListener listener)
|
||||
throws DbException {
|
||||
boolean reopened = db.open(key, listener);
|
||||
shutdown.addShutdownHook(() -> {
|
||||
shutdownManager.addShutdownHook(() -> {
|
||||
try {
|
||||
close();
|
||||
} catch (DbException e) {
|
||||
@@ -161,7 +170,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
try {
|
||||
T txn = txnClass.cast(transaction.unbox());
|
||||
if (transaction.isCommitted()) {
|
||||
for (Event e : transaction.getEvents()) eventBus.broadcast(e);
|
||||
for (CommitAction a : transaction.getActions())
|
||||
a.accept(visitor);
|
||||
} else {
|
||||
db.abortTransaction(txn);
|
||||
}
|
||||
@@ -976,4 +986,17 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
db.updateTransportKeys(txn, ks);
|
||||
}
|
||||
}
|
||||
|
||||
private class CommitActionVisitor implements Visitor {
|
||||
|
||||
@Override
|
||||
public void visit(EventAction a) {
|
||||
eventBus.broadcast(a.getEvent());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void visit(TaskAction a) {
|
||||
eventExecutor.execute(a.getTask());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,11 +3,13 @@ package org.briarproject.bramble.db;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DatabaseConfig;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.event.EventExecutor;
|
||||
import org.briarproject.bramble.api.lifecycle.ShutdownManager;
|
||||
import org.briarproject.bramble.api.sync.MessageFactory;
|
||||
import org.briarproject.bramble.api.system.Clock;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import javax.inject.Singleton;
|
||||
|
||||
@@ -27,8 +29,9 @@ public class DatabaseModule {
|
||||
@Provides
|
||||
@Singleton
|
||||
DatabaseComponent provideDatabaseComponent(Database<Connection> db,
|
||||
EventBus eventBus, ShutdownManager shutdown) {
|
||||
EventBus eventBus, @EventExecutor Executor eventExecutor,
|
||||
ShutdownManager shutdownManager) {
|
||||
return new DatabaseComponentImpl<>(db, Connection.class, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
@@ -87,9 +88,10 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private final Database<Object> database = context.mock(Database.class);
|
||||
private final ShutdownManager shutdown =
|
||||
private final ShutdownManager shutdownManager =
|
||||
context.mock(ShutdownManager.class);
|
||||
private final EventBus eventBus = context.mock(EventBus.class);
|
||||
private final Executor eventExecutor = context.mock(Executor.class);
|
||||
|
||||
private final SecretKey key = getSecretKey();
|
||||
private final Object txn = new Object();
|
||||
@@ -132,9 +134,10 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
}
|
||||
|
||||
private DatabaseComponent createDatabaseComponent(Database<Object> database,
|
||||
EventBus eventBus, ShutdownManager shutdown) {
|
||||
EventBus eventBus, Executor eventExecutor,
|
||||
ShutdownManager shutdownManager) {
|
||||
return new DatabaseComponentImpl<>(database, Object.class, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -144,7 +147,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
// open()
|
||||
oneOf(database).open(key, null);
|
||||
will(returnValue(false));
|
||||
oneOf(shutdown).addShutdownHook(with(any(Runnable.class)));
|
||||
oneOf(shutdownManager).addShutdownHook(with(any(Runnable.class)));
|
||||
will(returnValue(shutdownHandle));
|
||||
// startTransaction()
|
||||
oneOf(database).startTransaction();
|
||||
@@ -207,7 +210,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).close();
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
assertFalse(db.open(key, null));
|
||||
db.transaction(false, transaction -> {
|
||||
@@ -238,7 +241,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).abortTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
db.addLocalMessage(transaction, message, metadata, true));
|
||||
@@ -263,7 +266,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(eventBus).broadcast(with(any(MessageSharedEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
db.addLocalMessage(transaction, message, metadata, true));
|
||||
@@ -281,7 +284,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
exactly(17).of(database).abortTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
@@ -438,7 +441,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
exactly(3).of(database).abortTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
@@ -481,7 +484,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
will(returnValue(true));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
@@ -565,7 +568,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
will(returnValue(true));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
@@ -668,7 +671,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
exactly(5).of(database).abortTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
@@ -727,7 +730,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction -> {
|
||||
Ack a = db.generateAck(transaction, contactId, 123);
|
||||
@@ -761,7 +764,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(eventBus).broadcast(with(any(MessagesSentEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
assertEquals(messages, db.generateBatch(transaction, contactId,
|
||||
@@ -786,7 +789,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction -> {
|
||||
Offer o = db.generateOffer(transaction, contactId, 123, maxLatency);
|
||||
@@ -810,7 +813,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction -> {
|
||||
Request r = db.generateRequest(transaction, contactId, 123);
|
||||
@@ -844,7 +847,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(eventBus).broadcast(with(any(MessagesSentEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
assertEquals(messages, db.generateRequestedBatch(transaction,
|
||||
@@ -865,7 +868,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(eventBus).broadcast(with(any(MessagesAckedEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction -> {
|
||||
Ack a = new Ack(singletonList(messageId));
|
||||
@@ -903,7 +906,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction -> {
|
||||
// Receive the message twice
|
||||
@@ -931,7 +934,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
db.receiveMessage(transaction, contactId, message));
|
||||
@@ -949,7 +952,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
db.receiveMessage(transaction, contactId, message));
|
||||
@@ -989,7 +992,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(eventBus).broadcast(with(any(MessageToRequestEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
Offer o = new Offer(asList(messageId, messageId1,
|
||||
messageId2, messageId3));
|
||||
@@ -1012,7 +1015,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(eventBus).broadcast(with(any(MessageRequestedEvent.class)));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
Request r = new Request(singletonList(messageId));
|
||||
db.transaction(false, transaction ->
|
||||
@@ -1042,7 +1045,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
GroupVisibilityUpdatedEvent.class, 0));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
db.setGroupVisibility(transaction, contactId, groupId,
|
||||
@@ -1076,7 +1079,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
GroupVisibilityUpdatedEvent.class, 0));
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
db.setGroupVisibility(transaction, contactId, groupId,
|
||||
@@ -1102,7 +1105,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction ->
|
||||
db.setGroupVisibility(transaction, contactId, groupId,
|
||||
@@ -1132,7 +1135,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction -> {
|
||||
db.updateTransportKeys(transaction, keys);
|
||||
@@ -1171,7 +1174,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(true, transaction -> {
|
||||
// With visible group - return stored status
|
||||
@@ -1221,7 +1224,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).commitTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(true, transaction -> {
|
||||
// With visible group - return stored status
|
||||
@@ -1287,7 +1290,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
}});
|
||||
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
db.transaction(false, transaction -> {
|
||||
// First merge should broadcast an event
|
||||
@@ -1330,7 +1333,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
}});
|
||||
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
assertNotNull(db.startTransaction(firstTxnReadOnly));
|
||||
db.startTransaction(secondTxnReadOnly);
|
||||
@@ -1351,7 +1354,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
}});
|
||||
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
@@ -1380,7 +1383,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
}});
|
||||
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
@@ -1401,7 +1404,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
// open()
|
||||
oneOf(database).open(key, null);
|
||||
will(returnValue(false));
|
||||
oneOf(shutdown).addShutdownHook(with(any(Runnable.class)));
|
||||
oneOf(shutdownManager).addShutdownHook(with(any(Runnable.class)));
|
||||
will(returnValue(shutdownHandle));
|
||||
// startTransaction()
|
||||
oneOf(database).startTransaction();
|
||||
@@ -1441,7 +1444,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
oneOf(database).close();
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
shutdown);
|
||||
eventExecutor, shutdownManager);
|
||||
|
||||
assertFalse(db.open(key, null));
|
||||
db.transaction(false, transaction -> {
|
||||
|
||||
Reference in New Issue
Block a user