mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-12 10:49:06 +01:00
Merge branch '235-transport-key-manager-deadlock' into 'master'
Avoid potential deadlock in TransportKeyManager. #235 See rambling description on the ticket and in the architecture channel... Fixes #235. See merge request !79
This commit is contained in:
@@ -28,6 +28,10 @@ import java.util.Map;
|
||||
/**
|
||||
* Encapsulates the database implementation and exposes high-level operations
|
||||
* to other components.
|
||||
* <p>
|
||||
* This interface's methods are blocking, but they do not call out into other
|
||||
* components except to broadcast {@link org.briarproject.api.event.Event
|
||||
* Events}, so they can safely be called while holding locks.
|
||||
*/
|
||||
public interface DatabaseComponent {
|
||||
|
||||
|
||||
@@ -3,5 +3,9 @@ package org.briarproject.api.event;
|
||||
/** An interface for receiving notifications when events occur. */
|
||||
public interface EventListener {
|
||||
|
||||
/**
|
||||
* Called when an event is broadcast. Implementations of this method must
|
||||
* not block.
|
||||
*/
|
||||
void eventOccurred(Event e);
|
||||
}
|
||||
|
||||
@@ -101,7 +101,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
|
||||
dbExecutor.execute(new Runnable() {
|
||||
public void run() {
|
||||
TransportKeyManager m = new TransportKeyManager(db, crypto,
|
||||
dbExecutor, timer, clock, t, maxLatency);
|
||||
timer, clock, t, maxLatency);
|
||||
// Don't add transport twice if event is received during startup
|
||||
if (managers.putIfAbsent(t, m) == null) m.start();
|
||||
}
|
||||
|
||||
@@ -18,10 +18,6 @@ import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
@@ -37,7 +33,6 @@ class TransportKeyManager extends TimerTask {
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final CryptoComponent crypto;
|
||||
private final ExecutorService dbExecutor;
|
||||
private final Timer timer;
|
||||
private final Clock clock;
|
||||
private final TransportId transportId;
|
||||
@@ -50,11 +45,10 @@ class TransportKeyManager extends TimerTask {
|
||||
private final Map<ContactId, MutableTransportKeys> keys;
|
||||
|
||||
TransportKeyManager(DatabaseComponent db, CryptoComponent crypto,
|
||||
ExecutorService dbExecutor, Timer timer, Clock clock,
|
||||
TransportId transportId, long maxLatency) {
|
||||
Timer timer, Clock clock, TransportId transportId,
|
||||
long maxLatency) {
|
||||
this.db = db;
|
||||
this.crypto = crypto;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.timer = timer;
|
||||
this.clock = clock;
|
||||
this.transportId = transportId;
|
||||
@@ -66,36 +60,39 @@ class TransportKeyManager extends TimerTask {
|
||||
}
|
||||
|
||||
void start() {
|
||||
// Load the transport keys from the DB
|
||||
Map<ContactId, TransportKeys> loaded;
|
||||
try {
|
||||
loaded = db.getTransportKeys(transportId);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return;
|
||||
}
|
||||
// Rotate the keys to the current rotation period
|
||||
Map<ContactId, TransportKeys> rotated =
|
||||
new HashMap<ContactId, TransportKeys>();
|
||||
Map<ContactId, TransportKeys> current =
|
||||
new HashMap<ContactId, TransportKeys>();
|
||||
long now = clock.currentTimeMillis();
|
||||
long rotationPeriod = now / rotationPeriodLength;
|
||||
for (Entry<ContactId, TransportKeys> e : loaded.entrySet()) {
|
||||
ContactId c = e.getKey();
|
||||
TransportKeys k = e.getValue();
|
||||
TransportKeys k1 = crypto.rotateTransportKeys(k, rotationPeriod);
|
||||
if (k1.getRotationPeriod() > k.getRotationPeriod())
|
||||
rotated.put(c, k1);
|
||||
current.put(c, k1);
|
||||
}
|
||||
lock.lock();
|
||||
try {
|
||||
// Load the transport keys from the DB
|
||||
Map<ContactId, TransportKeys> loaded;
|
||||
try {
|
||||
loaded = db.getTransportKeys(transportId);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return;
|
||||
}
|
||||
// Rotate the keys to the current rotation period
|
||||
Map<ContactId, TransportKeys> rotated =
|
||||
new HashMap<ContactId, TransportKeys>();
|
||||
Map<ContactId, TransportKeys> current =
|
||||
new HashMap<ContactId, TransportKeys>();
|
||||
long rotationPeriod = now / rotationPeriodLength;
|
||||
for (Entry<ContactId, TransportKeys> e : loaded.entrySet()) {
|
||||
ContactId c = e.getKey();
|
||||
TransportKeys k = e.getValue();
|
||||
TransportKeys k1 = crypto.rotateTransportKeys(k,
|
||||
rotationPeriod);
|
||||
if (k1.getRotationPeriod() > k.getRotationPeriod())
|
||||
rotated.put(c, k1);
|
||||
current.put(c, k1);
|
||||
}
|
||||
// Initialise mutable state for all contacts
|
||||
for (Entry<ContactId, TransportKeys> e : current.entrySet())
|
||||
addKeys(e.getKey(), new MutableTransportKeys(e.getValue()));
|
||||
// Write any rotated keys back to the DB
|
||||
saveTransportKeys(rotated);
|
||||
db.updateTransportKeys(rotated);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
@@ -123,54 +120,29 @@ class TransportKeyManager extends TimerTask {
|
||||
}
|
||||
}
|
||||
|
||||
private void saveTransportKeys(
|
||||
final Map<ContactId, TransportKeys> rotated) {
|
||||
dbExecutor.execute(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
db.updateTransportKeys(rotated);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void addContact(ContactId c, SecretKey master, long timestamp,
|
||||
boolean alice) {
|
||||
// Work out what rotation period the timestamp belongs to
|
||||
long rotationPeriod = timestamp / rotationPeriodLength;
|
||||
// Derive the transport keys
|
||||
TransportKeys k = crypto.deriveTransportKeys(transportId, master,
|
||||
rotationPeriod, alice);
|
||||
// Rotate the keys to the current rotation period if necessary
|
||||
rotationPeriod = clock.currentTimeMillis() / rotationPeriodLength;
|
||||
k = crypto.rotateTransportKeys(k, rotationPeriod);
|
||||
lock.lock();
|
||||
try {
|
||||
// Work out what rotation period the timestamp belongs to
|
||||
long rotationPeriod = timestamp / rotationPeriodLength;
|
||||
// Derive the transport keys
|
||||
TransportKeys k = crypto.deriveTransportKeys(transportId, master,
|
||||
rotationPeriod, alice);
|
||||
// Rotate the keys to the current rotation period if necessary
|
||||
rotationPeriod = clock.currentTimeMillis() / rotationPeriodLength;
|
||||
k = crypto.rotateTransportKeys(k, rotationPeriod);
|
||||
// Initialise mutable state for the contact
|
||||
addKeys(c, new MutableTransportKeys(k));
|
||||
// Write the keys back to the DB
|
||||
saveTransportKeys(c, k);
|
||||
db.addTransportKeys(c, k);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void saveTransportKeys(final ContactId c, final TransportKeys k) {
|
||||
dbExecutor.execute(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
db.addTransportKeys(c, k);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void removeContact(ContactId c) {
|
||||
lock.lock();
|
||||
try {
|
||||
@@ -187,8 +159,6 @@ class TransportKeyManager extends TimerTask {
|
||||
}
|
||||
|
||||
StreamContext getStreamContext(ContactId c) {
|
||||
StreamContext ctx;
|
||||
Future<Void> saved;
|
||||
lock.lock();
|
||||
try {
|
||||
// Look up the outgoing keys for the contact
|
||||
@@ -196,48 +166,23 @@ class TransportKeyManager extends TimerTask {
|
||||
if (outKeys == null) return null;
|
||||
if (outKeys.getStreamCounter() > MAX_32_BIT_UNSIGNED) return null;
|
||||
// Create a stream context
|
||||
ctx = new StreamContext(c, transportId, outKeys.getTagKey(),
|
||||
outKeys.getHeaderKey(), outKeys.getStreamCounter());
|
||||
StreamContext ctx = new StreamContext(c, transportId,
|
||||
outKeys.getTagKey(), outKeys.getHeaderKey(),
|
||||
outKeys.getStreamCounter());
|
||||
// Increment the stream counter and write it back to the DB
|
||||
outKeys.incrementStreamCounter();
|
||||
saved = saveIncrementedStreamCounter(c,
|
||||
db.incrementStreamCounter(c, transportId,
|
||||
outKeys.getRotationPeriod());
|
||||
return ctx;
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return null;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
// Wait for the save to complete before returning the stream context
|
||||
try {
|
||||
saved.get();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warning("Interrupted while incrementing stream counter");
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
} catch (ExecutionException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
return null;
|
||||
}
|
||||
return ctx;
|
||||
}
|
||||
|
||||
private Future<Void> saveIncrementedStreamCounter(final ContactId c,
|
||||
final long rotationPeriod) {
|
||||
return dbExecutor.submit(new Callable<Void>() {
|
||||
public Void call() {
|
||||
try {
|
||||
db.incrementStreamCounter(c, transportId, rotationPeriod);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
StreamContext recogniseTag(byte[] tag) {
|
||||
StreamContext ctx;
|
||||
Future<Void> saved;
|
||||
lock.lock();
|
||||
try {
|
||||
// Look up the incoming keys for the tag
|
||||
@@ -245,7 +190,7 @@ class TransportKeyManager extends TimerTask {
|
||||
if (tagCtx == null) return null;
|
||||
MutableIncomingKeys inKeys = tagCtx.inKeys;
|
||||
// Create a stream context
|
||||
ctx = new StreamContext(tagCtx.contactId, transportId,
|
||||
StreamContext ctx = new StreamContext(tagCtx.contactId, transportId,
|
||||
inKeys.getTagKey(), inKeys.getHeaderKey(),
|
||||
tagCtx.streamNumber);
|
||||
// Update the reordering window
|
||||
@@ -265,45 +210,21 @@ class TransportKeyManager extends TimerTask {
|
||||
inContexts.remove(new Bytes(removeTag));
|
||||
}
|
||||
// Write the window back to the DB
|
||||
saved = saveReorderingWindow(tagCtx.contactId,
|
||||
db.setReorderingWindow(tagCtx.contactId, transportId,
|
||||
inKeys.getRotationPeriod(), window.getBase(),
|
||||
window.getBitmap());
|
||||
return ctx;
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return null;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
// Wait for the save to complete before returning the stream context
|
||||
try {
|
||||
saved.get();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warning("Interrupted while updating reordering window");
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
} catch (ExecutionException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
return null;
|
||||
}
|
||||
return ctx;
|
||||
}
|
||||
|
||||
private Future<Void> saveReorderingWindow(final ContactId c,
|
||||
final long rotationPeriod, final long base, final byte[] bitmap) {
|
||||
return dbExecutor.submit(new Callable<Void>() {
|
||||
public Void call() {
|
||||
try {
|
||||
db.setReorderingWindow(c, transportId, rotationPeriod,
|
||||
base, bitmap);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long now = clock.currentTimeMillis();
|
||||
lock.lock();
|
||||
try {
|
||||
// Rotate the keys to the current rotation period
|
||||
@@ -311,7 +232,6 @@ class TransportKeyManager extends TimerTask {
|
||||
new HashMap<ContactId, TransportKeys>();
|
||||
Map<ContactId, TransportKeys> current =
|
||||
new HashMap<ContactId, TransportKeys>();
|
||||
long now = clock.currentTimeMillis();
|
||||
long rotationPeriod = now / rotationPeriodLength;
|
||||
for (Entry<ContactId, MutableTransportKeys> e : keys.entrySet()) {
|
||||
ContactId c = e.getKey();
|
||||
@@ -329,12 +249,13 @@ class TransportKeyManager extends TimerTask {
|
||||
for (Entry<ContactId, TransportKeys> e : current.entrySet())
|
||||
addKeys(e.getKey(), new MutableTransportKeys(e.getValue()));
|
||||
// Write any rotated keys back to the DB
|
||||
saveTransportKeys(rotated);
|
||||
db.updateTransportKeys(rotated);
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
// Schedule the next key rotation
|
||||
long now = clock.currentTimeMillis();
|
||||
long delay = rotationPeriodLength - now % rotationPeriodLength;
|
||||
timer.schedule(this, delay);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user