Broadcast events after committing transactions.

This commit is contained in:
akwizgran
2016-02-11 14:49:41 +00:00
parent de8cc50fb4
commit 146dac056d
6 changed files with 88 additions and 85 deletions

View File

@@ -15,10 +15,15 @@ import org.briarproject.api.db.NoSuchMessageException;
import org.briarproject.api.db.NoSuchTransportException;
import org.briarproject.api.db.StorageStatus;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.ContactAddedEvent;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.GroupAddedEvent;
import org.briarproject.api.event.GroupRemovedEvent;
import org.briarproject.api.event.GroupVisibilityUpdatedEvent;
import org.briarproject.api.event.LocalAuthorAddedEvent;
import org.briarproject.api.event.LocalAuthorRemovedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageSharedEvent;
@@ -66,8 +71,6 @@ import static org.briarproject.api.sync.ValidationManager.Validity.UNKNOWN;
import static org.briarproject.api.sync.ValidationManager.Validity.VALID;
import static org.briarproject.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
// TODO: Callers should broadcast events after committing transactions
/**
* An implementation of DatabaseComponent using reentrant read-write locks.
* Depending on the JVM's lock implementation, this implementation may allow
@@ -127,8 +130,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void endTransaction(Transaction transaction) throws DbException {
T txn = txnClass.cast(transaction.unbox());
if (transaction.isComplete()) db.commitTransaction(txn);
else db.abortTransaction(txn);
if (transaction.isComplete()) {
db.commitTransaction(txn);
for (Event e : transaction.getEvents()) eventBus.broadcast(e);
} else {
db.abortTransaction(txn);
}
}
private T unbox(Transaction transaction) {
@@ -143,42 +150,40 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchLocalAuthorException();
if (db.containsContact(txn, remote.getId(), local))
throw new ContactExistsException();
return db.addContact(txn, remote, local);
ContactId c = db.addContact(txn, remote, local);
transaction.attach(new ContactAddedEvent(c));
return c;
}
public void addGroup(Transaction transaction, Group g) throws DbException {
boolean added = false;
T txn = unbox(transaction);
if (!db.containsGroup(txn, g.getId())) {
db.addGroup(txn, g);
added = true;
transaction.attach(new GroupAddedEvent(g));
}
if (added) eventBus.broadcast(new GroupAddedEvent(g));
}
public void addLocalAuthor(Transaction transaction, LocalAuthor a)
throws DbException {
T txn = unbox(transaction);
if (!db.containsLocalAuthor(txn, a.getId()))
if (!db.containsLocalAuthor(txn, a.getId())) {
db.addLocalAuthor(txn, a);
transaction.attach(new LocalAuthorAddedEvent(a.getId()));
}
}
public void addLocalMessage(Transaction transaction, Message m, ClientId c,
Metadata meta, boolean shared) throws DbException {
boolean added = false;
T txn = unbox(transaction);
if (!db.containsGroup(txn, m.getGroupId()))
throw new NoSuchGroupException();
if (!db.containsMessage(txn, m.getId())) {
addMessage(txn, m, VALID, shared, null);
added = true;
transaction.attach(new MessageAddedEvent(m, null));
transaction.attach(new MessageValidatedEvent(m, c, true, true));
if (shared) transaction.attach(new MessageSharedEvent(m));
}
db.mergeMessageMetadata(txn, m.getId(), meta);
if (added) {
eventBus.broadcast(new MessageAddedEvent(m, null));
eventBus.broadcast(new MessageValidatedEvent(m, c, true, true));
if (shared) eventBus.broadcast(new MessageSharedEvent(m));
}
}
/**
@@ -206,13 +211,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void addTransport(Transaction transaction, TransportId t,
int maxLatency) throws DbException {
boolean added = false;
T txn = unbox(transaction);
if (!db.containsTransport(txn, t)) {
db.addTransport(txn, t, maxLatency);
added = true;
transaction.attach(new TransportAddedEvent(t, maxLatency));
}
if (added) eventBus.broadcast(new TransportAddedEvent(t, maxLatency));
}
public void addTransportKeys(Transaction transaction, ContactId c,
@@ -266,9 +269,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
messages.add(db.getRawMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency);
}
if (!ids.isEmpty()) db.lowerRequestedFlag(txn, c, ids);
if (messages.isEmpty()) return null;
if (!ids.isEmpty()) eventBus.broadcast(new MessagesSentEvent(c, ids));
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids));
return Collections.unmodifiableList(messages);
}
@@ -309,9 +312,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
messages.add(db.getRawMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency);
}
if (!ids.isEmpty()) db.lowerRequestedFlag(txn, c, ids);
if (messages.isEmpty()) return null;
if (!ids.isEmpty()) eventBus.broadcast(new MessagesSentEvent(c, ids));
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids));
return Collections.unmodifiableList(messages);
}
@@ -486,7 +489,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
public void mergeSettings(Transaction transaction, Settings s,
String namespace) throws DbException {
boolean changed = false;
T txn = unbox(transaction);
Settings old = db.getSettings(txn, namespace);
Settings merged = new Settings();
@@ -494,9 +496,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
merged.putAll(s);
if (!merged.equals(old)) {
db.mergeSettings(txn, s, namespace);
changed = true;
transaction.attach(new SettingsUpdatedEvent(namespace));
}
if (changed) eventBus.broadcast(new SettingsUpdatedEvent(namespace));
}
public void receiveAck(Transaction transaction, ContactId c, Ack a)
@@ -511,24 +512,21 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
acked.add(m);
}
}
eventBus.broadcast(new MessagesAckedEvent(c, acked));
transaction.attach(new MessagesAckedEvent(c, acked));
}
public void receiveMessage(Transaction transaction, ContactId c, Message m)
throws DbException {
boolean duplicate, visible;
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
duplicate = db.containsMessage(txn, m.getId());
visible = db.containsVisibleGroup(txn, c, m.getGroupId());
if (visible) {
if (!duplicate) addMessage(txn, m, UNKNOWN, false, c);
if (db.containsVisibleGroup(txn, c, m.getGroupId())) {
if (!db.containsMessage(txn, m.getId())) {
addMessage(txn, m, UNKNOWN, false, c);
transaction.attach(new MessageAddedEvent(m, c));
}
db.raiseAckFlag(txn, c, m.getId());
}
if (visible) {
if (!duplicate) eventBus.broadcast(new MessageAddedEvent(m, c));
eventBus.broadcast(new MessageToAckEvent(c));
transaction.attach(new MessageToAckEvent(c));
}
}
@@ -550,8 +548,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
count++;
}
}
if (ack) eventBus.broadcast(new MessageToAckEvent(c));
if (request) eventBus.broadcast(new MessageToRequestEvent(c));
if (ack) transaction.attach(new MessageToAckEvent(c));
if (request) transaction.attach(new MessageToRequestEvent(c));
}
public void receiveRequest(Transaction transaction, ContactId c, Request r)
@@ -567,7 +565,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
requested = true;
}
}
if (requested) eventBus.broadcast(new MessageRequestedEvent(c));
if (requested) transaction.attach(new MessageRequestedEvent(c));
}
public void removeContact(Transaction transaction, ContactId c)
@@ -576,6 +574,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
db.removeContact(txn, c);
transaction.attach(new ContactRemovedEvent(c));
}
public void removeGroup(Transaction transaction, Group g)
@@ -587,8 +586,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchGroupException();
affected = db.getVisibility(txn, id);
db.removeGroup(txn, id);
eventBus.broadcast(new GroupRemovedEvent(g));
eventBus.broadcast(new GroupVisibilityUpdatedEvent(affected));
transaction.attach(new GroupRemovedEvent(g));
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
}
public void removeLocalAuthor(Transaction transaction, AuthorId a)
@@ -597,6 +596,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsLocalAuthor(txn, a))
throw new NoSuchLocalAuthorException();
db.removeLocalAuthor(txn, a);
transaction.attach(new LocalAuthorRemovedEvent(a));
}
public void removeTransport(Transaction transaction, TransportId t)
@@ -605,7 +605,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsTransport(txn, t))
throw new NoSuchTransportException();
db.removeTransport(txn, t);
eventBus.broadcast(new TransportRemovedEvent(t));
transaction.attach(new TransportRemovedEvent(t));
}
public void setContactStatus(Transaction transaction, ContactId c,
@@ -630,7 +630,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsMessage(txn, m.getId()))
throw new NoSuchMessageException();
db.setMessageShared(txn, m.getId(), shared);
if (shared) eventBus.broadcast(new MessageSharedEvent(m));
if (shared) transaction.attach(new MessageSharedEvent(m));
}
public void setMessageValid(Transaction transaction, Message m, ClientId c,
@@ -639,7 +639,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsMessage(txn, m.getId()))
throw new NoSuchMessageException();
db.setMessageValid(txn, m.getId(), valid);
eventBus.broadcast(new MessageValidatedEvent(m, c, false, valid));
transaction.attach(new MessageValidatedEvent(m, c, false, valid));
}
public void setReorderingWindow(Transaction transaction, ContactId c,
@@ -665,8 +665,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (visible && !wasVisible) db.addVisibility(txn, c, g);
else if (!visible && wasVisible) db.removeVisibility(txn, c, g);
if (visible != wasVisible) {
eventBus.broadcast(new GroupVisibilityUpdatedEvent(
Collections.singletonList(c)));
List<ContactId> affected = Collections.singletonList(c);
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
}
}