Separated event infrastructure from DB.

This commit is contained in:
akwizgran
2014-10-03 09:44:54 +01:00
parent 6a4ea49786
commit 8b8df435a5
29 changed files with 368 additions and 244 deletions

View File

@@ -15,7 +15,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
@@ -42,8 +41,7 @@ import org.briarproject.api.db.NoSuchSubscriptionException;
import org.briarproject.api.db.NoSuchTransportException;
import org.briarproject.api.event.ContactAddedEvent;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.LocalAuthorAddedEvent;
import org.briarproject.api.event.LocalAuthorRemovedEvent;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
@@ -95,21 +93,21 @@ DatabaseCleaner.Callback {
private final Database<T> db;
private final DatabaseCleaner cleaner;
private final EventBus eventBus;
private final ShutdownManager shutdown;
private final ReentrantReadWriteLock lock =
new ReentrantReadWriteLock(true);
private final Collection<EventListener> listeners =
new CopyOnWriteArrayList<EventListener>();
private boolean open = false; // Locking: lock.writeLock
private int shutdownHandle = -1; // Locking: lock.writeLock
@Inject
DatabaseComponentImpl(Database<T> db, DatabaseCleaner cleaner,
ShutdownManager shutdown) {
EventBus eventBus, ShutdownManager shutdown) {
this.db = db;
this.cleaner = cleaner;
this.eventBus = eventBus;
this.shutdown = shutdown;
}
@@ -158,19 +156,6 @@ DatabaseCleaner.Callback {
}
}
public void addListener(EventListener l) {
listeners.add(l);
}
public void removeListener(EventListener l) {
listeners.remove(l);
}
/** Notifies all listeners of a database event. */
private void callListeners(Event e) {
for(EventListener l : listeners) l.eventOccurred(e);
}
public ContactId addContact(Author remote, AuthorId local)
throws DbException {
ContactId c;
@@ -191,7 +176,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
callListeners(new ContactAddedEvent(c));
eventBus.broadcast(new ContactAddedEvent(c));
return c;
}
@@ -231,7 +216,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
if(added) callListeners(new SubscriptionAddedEvent(g));
if(added) eventBus.broadcast(new SubscriptionAddedEvent(g));
return added;
}
@@ -251,18 +236,18 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
callListeners(new LocalAuthorAddedEvent(a.getId()));
eventBus.broadcast(new LocalAuthorAddedEvent(a.getId()));
}
public void addLocalMessage(Message m) throws DbException {
boolean duplicate;
boolean duplicate, subscribed;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
duplicate = db.containsMessage(txn, m.getId());
if(!duplicate && db.containsGroup(txn, m.getGroup().getId()))
addMessage(txn, m, null);
subscribed = db.containsGroup(txn, m.getGroup().getId());
if(!duplicate && subscribed) addMessage(txn, m, null);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -271,7 +256,8 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
if(!duplicate) callListeners(new MessageAddedEvent(m.getGroup(), null));
if(!duplicate && subscribed)
eventBus.broadcast(new MessageAddedEvent(m.getGroup(), null));
}
/**
@@ -344,7 +330,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
if(added) callListeners(new TransportAddedEvent(t, maxLatency));
if(added) eventBus.broadcast(new TransportAddedEvent(t, maxLatency));
return added;
}
@@ -1062,7 +1048,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
if(changed) callListeners(new LocalTransportsUpdatedEvent());
if(changed) eventBus.broadcast(new LocalTransportsUpdatedEvent());
}
public void mergeSettings(Settings s) throws DbException {
@@ -1083,7 +1069,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
if(changed) callListeners(new SettingsUpdatedEvent());
if(changed) eventBus.broadcast(new SettingsUpdatedEvent());
}
public void receiveAck(ContactId c, Ack a) throws DbException {
@@ -1108,7 +1094,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
callListeners(new MessagesAckedEvent(c, acked));
eventBus.broadcast(new MessagesAckedEvent(c, acked));
}
public void receiveMessage(ContactId c, Message m) throws DbException {
@@ -1135,8 +1121,8 @@ DatabaseCleaner.Callback {
}
if(visible) {
if(!duplicate)
callListeners(new MessageAddedEvent(m.getGroup(), c));
callListeners(new MessageToAckEvent(c));
eventBus.broadcast(new MessageAddedEvent(m.getGroup(), c));
eventBus.broadcast(new MessageToAckEvent(c));
}
}
@@ -1168,8 +1154,8 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
if(ack) callListeners(new MessageToAckEvent(c));
if(request) callListeners(new MessageToRequestEvent(c));
if(ack) eventBus.broadcast(new MessageToAckEvent(c));
if(request) eventBus.broadcast(new MessageToRequestEvent(c));
}
public void receiveRequest(ContactId c, Request r) throws DbException {
@@ -1195,7 +1181,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
if(requested) callListeners(new MessageRequestedEvent(c));
if(requested) eventBus.broadcast(new MessageRequestedEvent(c));
}
public void receiveRetentionAck(ContactId c, RetentionAck a)
@@ -1236,7 +1222,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
if(updated) callListeners(new RemoteRetentionTimeUpdatedEvent(c));
if(updated) eventBus.broadcast(new RemoteRetentionTimeUpdatedEvent(c));
}
public void receiveSubscriptionAck(ContactId c, SubscriptionAck a)
@@ -1276,7 +1262,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
if(updated) callListeners(new RemoteSubscriptionsUpdatedEvent(c));
if(updated) eventBus.broadcast(new RemoteSubscriptionsUpdatedEvent(c));
}
public void receiveTransportAck(ContactId c, TransportAck a)
@@ -1322,7 +1308,7 @@ DatabaseCleaner.Callback {
lock.writeLock().unlock();
}
if(updated)
callListeners(new RemoteTransportsUpdatedEvent(c, u.getId()));
eventBus.broadcast(new RemoteTransportsUpdatedEvent(c, u.getId()));
}
public void removeContact(ContactId c) throws DbException {
@@ -1343,7 +1329,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
callListeners(new ContactRemovedEvent(c));
eventBus.broadcast(new ContactRemovedEvent(c));
}
public void removeGroup(Group g) throws DbException {
@@ -1365,8 +1351,8 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
callListeners(new SubscriptionRemovedEvent(g));
callListeners(new LocalSubscriptionsUpdatedEvent(affected));
eventBus.broadcast(new SubscriptionRemovedEvent(g));
eventBus.broadcast(new LocalSubscriptionsUpdatedEvent(affected));
}
public void removeLocalAuthor(AuthorId a) throws DbException {
@@ -1391,8 +1377,9 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
for(ContactId c : affected) callListeners(new ContactRemovedEvent(c));
callListeners(new LocalAuthorRemovedEvent(a));
for(ContactId c : affected)
eventBus.broadcast(new ContactRemovedEvent(c));
eventBus.broadcast(new LocalAuthorRemovedEvent(a));
}
public void removeTransport(TransportId t) throws DbException {
@@ -1411,7 +1398,7 @@ DatabaseCleaner.Callback {
} finally {
lock.writeLock().unlock();
}
callListeners(new TransportRemovedEvent(t));
eventBus.broadcast(new TransportRemovedEvent(t));
}
public void setConnectionWindow(ContactId c, TransportId t, long period,
@@ -1526,7 +1513,7 @@ DatabaseCleaner.Callback {
lock.writeLock().unlock();
}
if(!affected.isEmpty())
callListeners(new LocalSubscriptionsUpdatedEvent(affected));
eventBus.broadcast(new LocalSubscriptionsUpdatedEvent(affected));
}
public void setVisibleToAll(GroupId g, boolean all) throws DbException {
@@ -1559,7 +1546,7 @@ DatabaseCleaner.Callback {
lock.writeLock().unlock();
}
if(!affected.isEmpty())
callListeners(new LocalSubscriptionsUpdatedEvent(affected));
eventBus.broadcast(new LocalSubscriptionsUpdatedEvent(affected));
}
public void checkFreeSpaceAndClean() throws DbException {
@@ -1603,7 +1590,7 @@ DatabaseCleaner.Callback {
lock.writeLock().unlock();
}
if(expired.isEmpty()) return false;
callListeners(new MessageExpiredEvent());
eventBus.broadcast(new MessageExpiredEvent());
return true;
}

View File

@@ -15,6 +15,7 @@ import javax.inject.Singleton;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseConfig;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.lifecycle.ShutdownManager;
import org.briarproject.api.system.FileUtils;
@@ -38,6 +39,7 @@ public class DatabaseModule extends AbstractModule {
policy);
}
@Override
protected void configure() {
bind(DatabaseCleaner.class).to(DatabaseCleanerImpl.class);
}
@@ -50,8 +52,10 @@ public class DatabaseModule extends AbstractModule {
@Provides @Singleton
DatabaseComponent getDatabaseComponent(Database<Connection> db,
DatabaseCleaner cleaner, ShutdownManager shutdown) {
return new DatabaseComponentImpl<Connection>(db, cleaner, shutdown);
DatabaseCleaner cleaner, EventBus eventBus,
ShutdownManager shutdown) {
return new DatabaseComponentImpl<Connection>(db, cleaner, eventBus,
shutdown);
}
@Provides @Singleton @DatabaseExecutor

View File

@@ -0,0 +1,26 @@
package org.briarproject.event;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
class EventBusImpl implements EventBus {
private final Collection<EventListener> listeners =
new CopyOnWriteArrayList<EventListener>();
public void addListener(EventListener l) {
listeners.add(l);
}
public void removeListener(EventListener l) {
listeners.remove(l);
}
public void broadcast(Event e) {
for(EventListener l : listeners) l.eventOccurred(e);
}
}

View File

@@ -0,0 +1,14 @@
package org.briarproject.event;
import org.briarproject.api.event.EventBus;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
public class EventModule extends AbstractModule {
@Override
protected void configure() {
bind(EventBus.class).to(EventBusImpl.class).in(Singleton.class);
}
}

View File

@@ -22,6 +22,7 @@ import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
@@ -72,6 +73,7 @@ abstract class DuplexConnection implements EventListener {
};
protected final DatabaseComponent db;
protected final EventBus eventBus;
protected final ConnectionRegistry connRegistry;
protected final ConnectionReaderFactory connReaderFactory;
protected final ConnectionWriterFactory connWriterFactory;
@@ -92,7 +94,7 @@ abstract class DuplexConnection implements EventListener {
DuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
EventBus eventBus, ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
@@ -102,6 +104,7 @@ abstract class DuplexConnection implements EventListener {
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.eventBus = eventBus;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
@@ -218,7 +221,7 @@ abstract class DuplexConnection implements EventListener {
void write() {
connRegistry.registerConnection(contactId, transportId);
db.addListener(this);
eventBus.addListener(this);
try {
OutputStream out = createConnectionWriter().getOutputStream();
writer = packetWriterFactory.createPacketWriter(out, true);
@@ -260,7 +263,7 @@ abstract class DuplexConnection implements EventListener {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
}
db.removeListener(this);
eventBus.removeListener(this);
connRegistry.unregisterConnection(contactId, transportId);
}

View File

@@ -11,6 +11,7 @@ import org.briarproject.api.crypto.CryptoExecutor;
import org.briarproject.api.crypto.KeyManager;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
@@ -29,6 +30,7 @@ class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final EventBus eventBus;
private final KeyManager keyManager;
private final ConnectionRegistry connRegistry;
private final ConnectionReaderFactory connReaderFactory;
@@ -40,7 +42,8 @@ class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
DuplexConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
KeyManager keyManager, ConnectionRegistry connRegistry,
EventBus eventBus, KeyManager keyManager,
ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
@@ -49,6 +52,7 @@ class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.eventBus = eventBus;
this.keyManager = keyManager;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
@@ -60,7 +64,7 @@ class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
public void createIncomingConnection(ConnectionContext ctx,
DuplexTransportConnection transport) {
final DuplexConnection conn = new IncomingDuplexConnection(dbExecutor,
cryptoExecutor, messageVerifier, db, connRegistry,
cryptoExecutor, messageVerifier, db, eventBus, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
Runnable write = new Runnable() {
@@ -85,7 +89,7 @@ class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
return;
}
final DuplexConnection conn = new OutgoingDuplexConnection(dbExecutor,
cryptoExecutor, messageVerifier, db, connRegistry,
cryptoExecutor, messageVerifier, db, eventBus, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
Runnable write = new Runnable() {

View File

@@ -6,6 +6,7 @@ import java.io.OutputStream;
import java.util.concurrent.Executor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
@@ -21,15 +22,15 @@ class IncomingDuplexConnection extends DuplexConnection {
IncomingDuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
EventBus eventBus, ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory,
ConnectionContext ctx, DuplexTransportConnection transport) {
super(dbExecutor, cryptoExecutor, messageVerifier, db, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
super(dbExecutor, cryptoExecutor, messageVerifier, db, eventBus,
connRegistry, connReaderFactory, connWriterFactory,
packetReaderFactory, packetWriterFactory, ctx, transport);
}
@Override

View File

@@ -6,6 +6,7 @@ import java.io.OutputStream;
import java.util.concurrent.Executor;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriterFactory;
@@ -21,15 +22,15 @@ class OutgoingDuplexConnection extends DuplexConnection {
OutgoingDuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
EventBus eventBus, ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory, ConnectionContext ctx,
DuplexTransportConnection transport) {
super(dbExecutor, cryptoExecutor, messageVerifier, db, connRegistry,
connReaderFactory, connWriterFactory, packetReaderFactory,
packetWriterFactory, ctx, transport);
super(dbExecutor, cryptoExecutor, messageVerifier, db, eventBus,
connRegistry, connReaderFactory, connWriterFactory,
packetReaderFactory, packetWriterFactory, ctx, transport);
}
@Override

View File

@@ -23,6 +23,7 @@ import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.TransportAddedEvent;
import org.briarproject.api.event.TransportRemovedEvent;
@@ -44,6 +45,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
private final CryptoComponent crypto;
private final DatabaseComponent db;
private final EventBus eventBus;
private final ConnectionRecogniser connectionRecogniser;
private final Clock clock;
private final Timer timer;
@@ -56,10 +58,11 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
@Inject
KeyManagerImpl(CryptoComponent crypto, DatabaseComponent db,
ConnectionRecogniser connectionRecogniser, Clock clock,
Timer timer) {
EventBus eventBus, ConnectionRecogniser connectionRecogniser,
Clock clock, Timer timer) {
this.crypto = crypto;
this.db = db;
this.eventBus = eventBus;
this.connectionRecogniser = connectionRecogniser;
this.clock = clock;
this.timer = timer;
@@ -70,7 +73,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
}
public synchronized boolean start() {
db.addListener(this);
eventBus.addListener(this);
// Load the temporary secrets and transport latencies from the database
Collection<TemporarySecret> secrets;
try {
@@ -213,7 +216,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
}
public synchronized boolean stop() {
db.removeListener(this);
eventBus.removeListener(this);
timer.cancel();
connectionRecogniser.removeSecrets();
maxLatencies.clear();
@@ -290,6 +293,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
connectionRecogniser.addSecret(s3);
}
@Override
public synchronized void run() {
// Rebuild the maps because we may be running a whole period late
Collection<TemporarySecret> secrets = new ArrayList<TemporarySecret>();
@@ -399,6 +403,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
this.event = event;
}
@Override
public void run() {
ContactId c = event.getContactId();
connectionRecogniser.removeSecrets(c);
@@ -418,6 +423,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
this.event = event;
}
@Override
public void run() {
synchronized(KeyManagerImpl.this) {
maxLatencies.put(event.getTransportId(), event.getMaxLatency());
@@ -433,6 +439,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
this.event = event;
}
@Override
public void run() {
TransportId t = event.getTransportId();
connectionRecogniser.removeSecrets(t);