Fixed potential deadlock in TransportKeyManager. #235

This commit is contained in:
akwizgran
2016-01-26 10:32:12 +00:00
parent f2e1723b24
commit 06ade19260
2 changed files with 58 additions and 137 deletions

View File

@@ -104,7 +104,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
dbExecutor.execute(new Runnable() {
public void run() {
TransportKeyManager m = new TransportKeyManager(db, crypto,
dbExecutor, timer, clock, t, maxLatency);
timer, clock, t, maxLatency);
// Don't add transport twice if event is received during startup
if (managers.putIfAbsent(t, m) == null) m.start();
}

View File

@@ -18,10 +18,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
@@ -37,7 +33,6 @@ class TransportKeyManager extends TimerTask {
private final DatabaseComponent db;
private final CryptoComponent crypto;
private final ExecutorService dbExecutor;
private final Timer timer;
private final Clock clock;
private final TransportId transportId;
@@ -50,11 +45,10 @@ class TransportKeyManager extends TimerTask {
private final Map<ContactId, MutableTransportKeys> keys;
TransportKeyManager(DatabaseComponent db, CryptoComponent crypto,
ExecutorService dbExecutor, Timer timer, Clock clock,
TransportId transportId, long maxLatency) {
Timer timer, Clock clock, TransportId transportId,
long maxLatency) {
this.db = db;
this.crypto = crypto;
this.dbExecutor = dbExecutor;
this.timer = timer;
this.clock = clock;
this.transportId = transportId;
@@ -66,36 +60,39 @@ class TransportKeyManager extends TimerTask {
}
void start() {
// Load the transport keys from the DB
Map<ContactId, TransportKeys> loaded;
try {
loaded = db.getTransportKeys(transportId);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return;
}
// Rotate the keys to the current rotation period
Map<ContactId, TransportKeys> rotated =
new HashMap<ContactId, TransportKeys>();
Map<ContactId, TransportKeys> current =
new HashMap<ContactId, TransportKeys>();
long now = clock.currentTimeMillis();
long rotationPeriod = now / rotationPeriodLength;
for (Entry<ContactId, TransportKeys> e : loaded.entrySet()) {
ContactId c = e.getKey();
TransportKeys k = e.getValue();
TransportKeys k1 = crypto.rotateTransportKeys(k, rotationPeriod);
if (k1.getRotationPeriod() > k.getRotationPeriod())
rotated.put(c, k1);
current.put(c, k1);
}
lock.lock();
try {
// Load the transport keys from the DB
Map<ContactId, TransportKeys> loaded;
try {
loaded = db.getTransportKeys(transportId);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return;
}
// Rotate the keys to the current rotation period
Map<ContactId, TransportKeys> rotated =
new HashMap<ContactId, TransportKeys>();
Map<ContactId, TransportKeys> current =
new HashMap<ContactId, TransportKeys>();
long rotationPeriod = now / rotationPeriodLength;
for (Entry<ContactId, TransportKeys> e : loaded.entrySet()) {
ContactId c = e.getKey();
TransportKeys k = e.getValue();
TransportKeys k1 = crypto.rotateTransportKeys(k,
rotationPeriod);
if (k1.getRotationPeriod() > k.getRotationPeriod())
rotated.put(c, k1);
current.put(c, k1);
}
// Initialise mutable state for all contacts
for (Entry<ContactId, TransportKeys> e : current.entrySet())
addKeys(e.getKey(), new MutableTransportKeys(e.getValue()));
// Write any rotated keys back to the DB
saveTransportKeys(rotated);
db.updateTransportKeys(rotated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.unlock();
}
@@ -123,54 +120,29 @@ class TransportKeyManager extends TimerTask {
}
}
private void saveTransportKeys(
final Map<ContactId, TransportKeys> rotated) {
dbExecutor.execute(new Runnable() {
public void run() {
try {
db.updateTransportKeys(rotated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
void addContact(ContactId c, SecretKey master, long timestamp,
boolean alice) {
// Work out what rotation period the timestamp belongs to
long rotationPeriod = timestamp / rotationPeriodLength;
// Derive the transport keys
TransportKeys k = crypto.deriveTransportKeys(transportId, master,
rotationPeriod, alice);
// Rotate the keys to the current rotation period if necessary
rotationPeriod = clock.currentTimeMillis() / rotationPeriodLength;
k = crypto.rotateTransportKeys(k, rotationPeriod);
lock.lock();
try {
// Work out what rotation period the timestamp belongs to
long rotationPeriod = timestamp / rotationPeriodLength;
// Derive the transport keys
TransportKeys k = crypto.deriveTransportKeys(transportId, master,
rotationPeriod, alice);
// Rotate the keys to the current rotation period if necessary
rotationPeriod = clock.currentTimeMillis() / rotationPeriodLength;
k = crypto.rotateTransportKeys(k, rotationPeriod);
// Initialise mutable state for the contact
addKeys(c, new MutableTransportKeys(k));
// Write the keys back to the DB
saveTransportKeys(c, k);
db.addTransportKeys(c, k);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.unlock();
}
}
private void saveTransportKeys(final ContactId c, final TransportKeys k) {
dbExecutor.execute(new Runnable() {
public void run() {
try {
db.addTransportKeys(c, k);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
}
});
}
void removeContact(ContactId c) {
lock.lock();
try {
@@ -187,8 +159,6 @@ class TransportKeyManager extends TimerTask {
}
StreamContext getStreamContext(ContactId c) {
StreamContext ctx;
Future<Void> saved;
lock.lock();
try {
// Look up the outgoing keys for the contact
@@ -196,48 +166,23 @@ class TransportKeyManager extends TimerTask {
if (outKeys == null) return null;
if (outKeys.getStreamCounter() > MAX_32_BIT_UNSIGNED) return null;
// Create a stream context
ctx = new StreamContext(c, transportId, outKeys.getTagKey(),
outKeys.getHeaderKey(), outKeys.getStreamCounter());
StreamContext ctx = new StreamContext(c, transportId,
outKeys.getTagKey(), outKeys.getHeaderKey(),
outKeys.getStreamCounter());
// Increment the stream counter and write it back to the DB
outKeys.incrementStreamCounter();
saved = saveIncrementedStreamCounter(c,
db.incrementStreamCounter(c, transportId,
outKeys.getRotationPeriod());
return ctx;
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null;
} finally {
lock.unlock();
}
// Wait for the save to complete before returning the stream context
try {
saved.get();
} catch (InterruptedException e) {
LOG.warning("Interrupted while incrementing stream counter");
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
return null;
}
return ctx;
}
private Future<Void> saveIncrementedStreamCounter(final ContactId c,
final long rotationPeriod) {
return dbExecutor.submit(new Callable<Void>() {
public Void call() {
try {
db.incrementStreamCounter(c, transportId, rotationPeriod);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
return null;
}
});
}
StreamContext recogniseTag(byte[] tag) {
StreamContext ctx;
Future<Void> saved;
lock.lock();
try {
// Look up the incoming keys for the tag
@@ -245,7 +190,7 @@ class TransportKeyManager extends TimerTask {
if (tagCtx == null) return null;
MutableIncomingKeys inKeys = tagCtx.inKeys;
// Create a stream context
ctx = new StreamContext(tagCtx.contactId, transportId,
StreamContext ctx = new StreamContext(tagCtx.contactId, transportId,
inKeys.getTagKey(), inKeys.getHeaderKey(),
tagCtx.streamNumber);
// Update the reordering window
@@ -265,45 +210,21 @@ class TransportKeyManager extends TimerTask {
inContexts.remove(new Bytes(removeTag));
}
// Write the window back to the DB
saved = saveReorderingWindow(tagCtx.contactId,
db.setReorderingWindow(tagCtx.contactId, transportId,
inKeys.getRotationPeriod(), window.getBase(),
window.getBitmap());
return ctx;
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null;
} finally {
lock.unlock();
}
// Wait for the save to complete before returning the stream context
try {
saved.get();
} catch (InterruptedException e) {
LOG.warning("Interrupted while updating reordering window");
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
return null;
}
return ctx;
}
private Future<Void> saveReorderingWindow(final ContactId c,
final long rotationPeriod, final long base, final byte[] bitmap) {
return dbExecutor.submit(new Callable<Void>() {
public Void call() {
try {
db.setReorderingWindow(c, transportId, rotationPeriod,
base, bitmap);
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
}
return null;
}
});
}
@Override
public void run() {
long now = clock.currentTimeMillis();
lock.lock();
try {
// Rotate the keys to the current rotation period
@@ -311,7 +232,6 @@ class TransportKeyManager extends TimerTask {
new HashMap<ContactId, TransportKeys>();
Map<ContactId, TransportKeys> current =
new HashMap<ContactId, TransportKeys>();
long now = clock.currentTimeMillis();
long rotationPeriod = now / rotationPeriodLength;
for (Entry<ContactId, MutableTransportKeys> e : keys.entrySet()) {
ContactId c = e.getKey();
@@ -329,12 +249,13 @@ class TransportKeyManager extends TimerTask {
for (Entry<ContactId, TransportKeys> e : current.entrySet())
addKeys(e.getKey(), new MutableTransportKeys(e.getValue()));
// Write any rotated keys back to the DB
saveTransportKeys(rotated);
db.updateTransportKeys(rotated);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.unlock();
}
// Schedule the next key rotation
long now = clock.currentTimeMillis();
long delay = rotationPeriodLength - now % rotationPeriodLength;
timer.schedule(this, delay);
}