Inject dbExecutor as ExecutorService. #189

This commit is contained in:
akwizgran
2015-12-18 11:28:18 +00:00
parent 766179ac97
commit b7fe802d5d
3 changed files with 56 additions and 19 deletions

View File

@@ -55,8 +55,13 @@ public class DatabaseModule extends AbstractModule {
} }
@Provides @Singleton @DatabaseExecutor @Provides @Singleton @DatabaseExecutor
Executor getDatabaseExecutor(LifecycleManager lifecycleManager) { ExecutorService getDatabaseExecutor(LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(databaseExecutor); lifecycleManager.registerForShutdown(databaseExecutor);
return databaseExecutor; return databaseExecutor;
} }
@Provides @Singleton @DatabaseExecutor
Executor getDatabaseExecutor(@DatabaseExecutor ExecutorService dbExecutor) {
return dbExecutor;
}
} }

View File

@@ -22,7 +22,7 @@ import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.inject.Inject; import javax.inject.Inject;
@@ -36,7 +36,7 @@ class KeyManagerImpl implements KeyManager, EventListener {
private final DatabaseComponent db; private final DatabaseComponent db;
private final CryptoComponent crypto; private final CryptoComponent crypto;
private final Executor dbExecutor; private final ExecutorService dbExecutor;
private final EventBus eventBus; private final EventBus eventBus;
private final Timer timer; private final Timer timer;
private final Clock clock; private final Clock clock;
@@ -44,7 +44,7 @@ class KeyManagerImpl implements KeyManager, EventListener {
@Inject @Inject
KeyManagerImpl(DatabaseComponent db, CryptoComponent crypto, KeyManagerImpl(DatabaseComponent db, CryptoComponent crypto,
@DatabaseExecutor Executor dbExecutor, EventBus eventBus, @DatabaseExecutor ExecutorService dbExecutor, EventBus eventBus,
Timer timer, Clock clock) { Timer timer, Clock clock) {
this.db = db; this.db = db;
this.crypto = crypto; this.crypto = crypto;

View File

@@ -17,7 +17,10 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.Executor; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -33,7 +36,7 @@ class TransportKeyManager extends TimerTask {
private final DatabaseComponent db; private final DatabaseComponent db;
private final CryptoComponent crypto; private final CryptoComponent crypto;
private final Executor dbExecutor; private final ExecutorService dbExecutor;
private final Timer timer; private final Timer timer;
private final Clock clock; private final Clock clock;
private final TransportId transportId; private final TransportId transportId;
@@ -46,7 +49,7 @@ class TransportKeyManager extends TimerTask {
private final Map<ContactId, MutableTransportKeys> keys; private final Map<ContactId, MutableTransportKeys> keys;
TransportKeyManager(DatabaseComponent db, CryptoComponent crypto, TransportKeyManager(DatabaseComponent db, CryptoComponent crypto,
Executor dbExecutor, Timer timer, Clock clock, ExecutorService dbExecutor, Timer timer, Clock clock,
TransportId transportId, long maxLatency) { TransportId transportId, long maxLatency) {
this.db = db; this.db = db;
this.crypto = crypto; this.crypto = crypto;
@@ -119,7 +122,8 @@ class TransportKeyManager extends TimerTask {
} }
} }
private void saveTransportKeys(final Map<ContactId, TransportKeys> rotated) { private void saveTransportKeys(
final Map<ContactId, TransportKeys> rotated) {
dbExecutor.execute(new Runnable() { dbExecutor.execute(new Runnable() {
public void run() { public void run() {
try { try {
@@ -159,6 +163,7 @@ class TransportKeyManager extends TimerTask {
StreamContext getStreamContext(ContactId c) { StreamContext getStreamContext(ContactId c) {
StreamContext ctx; StreamContext ctx;
Future<Void> saved;
lock.lock(); lock.lock();
try { try {
// Look up the outgoing keys for the contact // Look up the outgoing keys for the contact
@@ -170,30 +175,44 @@ class TransportKeyManager extends TimerTask {
outKeys.getHeaderKey(), outKeys.getStreamCounter()); outKeys.getHeaderKey(), outKeys.getStreamCounter());
// Increment the stream counter and write it back to the DB // Increment the stream counter and write it back to the DB
outKeys.incrementStreamCounter(); outKeys.incrementStreamCounter();
saveIncrementedStreamCounter(c, outKeys.getRotationPeriod()); saved = saveIncrementedStreamCounter(c,
outKeys.getRotationPeriod());
} finally { } finally {
lock.unlock(); lock.unlock();
} }
// TODO: Wait for save to complete, return null if it fails // Wait for the save to complete before returning the stream context
try {
saved.get();
} catch (InterruptedException e) {
LOG.warning("Interrupted while incrementing stream counter");
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
return null;
}
return ctx; return ctx;
} }
private void saveIncrementedStreamCounter(final ContactId c, private Future<Void> saveIncrementedStreamCounter(final ContactId c,
final long rotationPeriod) { final long rotationPeriod) {
dbExecutor.execute(new Runnable() { return dbExecutor.submit(new Callable<Void>() {
public void run() { public Void call() {
try { try {
db.incrementStreamCounter(c, transportId, rotationPeriod); db.incrementStreamCounter(c, transportId, rotationPeriod);
} catch (DbException e) { } catch (DbException e) {
if (LOG.isLoggable(WARNING)) if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e); LOG.log(WARNING, e.toString(), e);
} }
return null;
} }
}); });
} }
StreamContext recogniseTag(byte[] tag) { StreamContext recogniseTag(byte[] tag) {
StreamContext ctx; StreamContext ctx;
Future<Void> saved;
lock.lock(); lock.lock();
try { try {
// Look up the incoming keys for the tag // Look up the incoming keys for the tag
@@ -221,19 +240,31 @@ class TransportKeyManager extends TimerTask {
inContexts.remove(new Bytes(removeTag)); inContexts.remove(new Bytes(removeTag));
} }
// Write the window back to the DB // Write the window back to the DB
saveReorderingWindow(tagCtx.contactId, inKeys.getRotationPeriod(), saved = saveReorderingWindow(tagCtx.contactId,
window.getBase(), window.getBitmap()); inKeys.getRotationPeriod(), window.getBase(),
window.getBitmap());
} finally { } finally {
lock.unlock(); lock.unlock();
} }
// TODO: Wait for save to complete, return null if it fails // Wait for the save to complete before returning the stream context
try {
saved.get();
} catch (InterruptedException e) {
LOG.warning("Interrupted while updating reordering window");
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
return null;
}
return ctx; return ctx;
} }
private void saveReorderingWindow(final ContactId c, private Future<Void> saveReorderingWindow(final ContactId c,
final long rotationPeriod, final long base, final byte[] bitmap) { final long rotationPeriod, final long base, final byte[] bitmap) {
dbExecutor.execute(new Runnable() { return dbExecutor.submit(new Callable<Void>() {
public void run() { public Void call() {
try { try {
db.setReorderingWindow(c, transportId, rotationPeriod, db.setReorderingWindow(c, transportId, rotationPeriod,
base, bitmap); base, bitmap);
@@ -241,6 +272,7 @@ class TransportKeyManager extends TimerTask {
if (LOG.isLoggable(WARNING)) if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e); LOG.log(WARNING, e.toString(), e);
} }
return null;
} }
}); });
} }