mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-14 19:59:05 +01:00
Merge branch 'AbrahamKiggundu/briar-master': better lock encapsulation
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
package org.briarproject.crypto;
|
||||
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.briarproject.api.crypto.MessageDigest;
|
||||
import org.spongycastle.crypto.BlockCipher;
|
||||
import org.spongycastle.crypto.digests.SHA256Digest;
|
||||
@@ -16,7 +19,9 @@ class FortunaGenerator {
|
||||
private static final int KEY_BYTES = 32;
|
||||
private static final int BLOCK_BYTES = 16;
|
||||
|
||||
// All of the following are locking: this
|
||||
private final Lock synchLock = new ReentrantLock();
|
||||
|
||||
// The following are locking: synchLock
|
||||
private final MessageDigest digest = new DoubleDigest(new SHA256Digest());
|
||||
private final BlockCipher cipher = new AESLightEngine();
|
||||
private final byte[] key = new byte[KEY_BYTES];
|
||||
@@ -28,56 +33,78 @@ class FortunaGenerator {
|
||||
reseed(seed);
|
||||
}
|
||||
|
||||
synchronized void reseed(byte[] seed) {
|
||||
digest.update(key);
|
||||
digest.update(seed);
|
||||
digest.digest(key, 0, KEY_BYTES);
|
||||
incrementCounter();
|
||||
void reseed(byte[] seed) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
digest.update(key);
|
||||
digest.update(seed);
|
||||
digest.digest(key, 0, KEY_BYTES);
|
||||
incrementCounter();
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Package access for testing
|
||||
synchronized void incrementCounter() {
|
||||
counter[0]++;
|
||||
for(int i = 0; counter[i] == 0; i++) {
|
||||
if(i + 1 == BLOCK_BYTES)
|
||||
throw new RuntimeException("Counter exhausted");
|
||||
counter[i + 1]++;
|
||||
void incrementCounter() {
|
||||
synchLock.lock();
|
||||
try {
|
||||
counter[0]++;
|
||||
for(int i = 0; counter[i] == 0; i++) {
|
||||
if(i + 1 == BLOCK_BYTES)
|
||||
throw new RuntimeException("Counter exhausted");
|
||||
counter[i + 1]++;
|
||||
}
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// Package access for testing
|
||||
synchronized byte[] getCounter() {
|
||||
return counter;
|
||||
byte[] getCounter() {
|
||||
synchLock.lock();
|
||||
try {
|
||||
return counter;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
synchronized int nextBytes(byte[] dest, int off, int len) {
|
||||
// Don't write more than the maximum number of bytes in one request
|
||||
if(len > MAX_BYTES_PER_REQUEST) len = MAX_BYTES_PER_REQUEST;
|
||||
cipher.init(true, new KeyParameter(key));
|
||||
// Generate full blocks directly into the output buffer
|
||||
int fullBlocks = len / BLOCK_BYTES;
|
||||
for(int i = 0; i < fullBlocks; i++) {
|
||||
cipher.processBlock(counter, 0, dest, off + i * BLOCK_BYTES);
|
||||
incrementCounter();
|
||||
int nextBytes(byte[] dest, int off, int len) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
// Don't write more than the maximum number of bytes in one request
|
||||
if(len > MAX_BYTES_PER_REQUEST) len = MAX_BYTES_PER_REQUEST;
|
||||
cipher.init(true, new KeyParameter(key));
|
||||
// Generate full blocks directly into the output buffer
|
||||
int fullBlocks = len / BLOCK_BYTES;
|
||||
for(int i = 0; i < fullBlocks; i++) {
|
||||
cipher.processBlock(counter, 0, dest, off + i * BLOCK_BYTES);
|
||||
incrementCounter();
|
||||
}
|
||||
// Generate a partial block if needed
|
||||
int done = fullBlocks * BLOCK_BYTES, remaining = len - done;
|
||||
assert remaining < BLOCK_BYTES;
|
||||
if(remaining > 0) {
|
||||
cipher.processBlock(counter, 0, buffer, 0);
|
||||
incrementCounter();
|
||||
// Copy the partial block to the output buffer and erase our copy
|
||||
System.arraycopy(buffer, 0, dest, off + done, remaining);
|
||||
for(int i = 0; i < BLOCK_BYTES; i++) buffer[i] = 0;
|
||||
}
|
||||
// Generate a new key
|
||||
for(int i = 0; i < KEY_BYTES / BLOCK_BYTES; i++) {
|
||||
cipher.processBlock(counter, 0, newKey, i * BLOCK_BYTES);
|
||||
incrementCounter();
|
||||
}
|
||||
System.arraycopy(newKey, 0, key, 0, KEY_BYTES);
|
||||
for(int i = 0; i < KEY_BYTES; i++) newKey[i] = 0;
|
||||
// Return the number of bytes written
|
||||
return len;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
// Generate a partial block if needed
|
||||
int done = fullBlocks * BLOCK_BYTES, remaining = len - done;
|
||||
assert remaining < BLOCK_BYTES;
|
||||
if(remaining > 0) {
|
||||
cipher.processBlock(counter, 0, buffer, 0);
|
||||
incrementCounter();
|
||||
// Copy the partial block to the output buffer and erase our copy
|
||||
System.arraycopy(buffer, 0, dest, off + done, remaining);
|
||||
for(int i = 0; i < BLOCK_BYTES; i++) buffer[i] = 0;
|
||||
}
|
||||
// Generate a new key
|
||||
for(int i = 0; i < KEY_BYTES / BLOCK_BYTES; i++) {
|
||||
cipher.processBlock(counter, 0, newKey, i * BLOCK_BYTES);
|
||||
incrementCounter();
|
||||
}
|
||||
System.arraycopy(newKey, 0, key, 0, KEY_BYTES);
|
||||
for(int i = 0; i < KEY_BYTES; i++) newKey[i] = 0;
|
||||
// Return the number of bytes written
|
||||
return len;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ class PseudoRandomImpl implements PseudoRandom {
|
||||
generator = new FortunaGenerator(seed);
|
||||
}
|
||||
|
||||
public synchronized byte[] nextBytes(int length) {
|
||||
public byte[] nextBytes(int length) {
|
||||
byte[] b = new byte[length];
|
||||
int offset = 0;
|
||||
while(offset < length) offset += generator.nextBytes(b, offset, length);
|
||||
|
||||
@@ -27,6 +27,9 @@ import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import org.briarproject.api.Author;
|
||||
@@ -313,16 +316,19 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
private final Clock clock;
|
||||
|
||||
private final LinkedList<Connection> connections =
|
||||
new LinkedList<Connection>(); // Locking: self
|
||||
new LinkedList<Connection>(); // Locking: connectionsLock
|
||||
|
||||
private final AtomicInteger transactionCount = new AtomicInteger(0);
|
||||
|
||||
private int openConnections = 0; // Locking: connections
|
||||
private boolean closed = false; // Locking: connections
|
||||
private int openConnections = 0; // Locking: connectionsLock
|
||||
private boolean closed = false; // Locking: connectionsLock
|
||||
|
||||
protected abstract Connection createConnection() throws SQLException;
|
||||
protected abstract void flushBuffersToDisk(Statement s) throws SQLException;
|
||||
|
||||
private final Lock connectionsLock = new ReentrantLock();
|
||||
private final Condition connectionsChanged = connectionsLock.newCondition();
|
||||
|
||||
JdbcDatabase(String hashType, String binaryType, String counterType,
|
||||
String secretType, Clock clock) {
|
||||
this.hashType = hashType;
|
||||
@@ -431,9 +437,12 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
|
||||
public Connection startTransaction() throws DbException {
|
||||
Connection txn = null;
|
||||
synchronized(connections) {
|
||||
connectionsLock.lock();
|
||||
try {
|
||||
if(closed) throw new DbClosedException();
|
||||
txn = connections.poll();
|
||||
} finally {
|
||||
connectionsLock.unlock();
|
||||
}
|
||||
try {
|
||||
if(txn == null) {
|
||||
@@ -441,8 +450,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
txn = createConnection();
|
||||
if(txn == null) throw new DbException();
|
||||
txn.setAutoCommit(false);
|
||||
synchronized(connections) {
|
||||
connectionsLock.lock();
|
||||
try {
|
||||
openConnections++;
|
||||
} finally {
|
||||
connectionsLock.unlock();
|
||||
}
|
||||
}
|
||||
} catch(SQLException e) {
|
||||
@@ -455,9 +467,12 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
public void abortTransaction(Connection txn) {
|
||||
try {
|
||||
txn.rollback();
|
||||
synchronized(connections) {
|
||||
connectionsLock.lock();
|
||||
try {
|
||||
connections.add(txn);
|
||||
connections.notifyAll();
|
||||
connectionsChanged.signalAll();
|
||||
} finally {
|
||||
connectionsLock.unlock();
|
||||
}
|
||||
} catch(SQLException e) {
|
||||
// Try to close the connection
|
||||
@@ -468,9 +483,12 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e1.toString(), e1);
|
||||
}
|
||||
// Whatever happens, allow the database to close
|
||||
synchronized(connections) {
|
||||
connectionsLock.lock();
|
||||
try {
|
||||
openConnections--;
|
||||
connections.notifyAll();
|
||||
connectionsChanged.signalAll();
|
||||
} finally {
|
||||
connectionsLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -486,9 +504,12 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
tryToClose(s);
|
||||
throw new DbException(e);
|
||||
}
|
||||
synchronized(connections) {
|
||||
connectionsLock.lock();
|
||||
try {
|
||||
connections.add(txn);
|
||||
connections.notifyAll();
|
||||
connectionsChanged.signalAll();
|
||||
} finally {
|
||||
connectionsLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -502,14 +523,15 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
|
||||
protected void closeAllConnections() throws SQLException {
|
||||
boolean interrupted = false;
|
||||
synchronized(connections) {
|
||||
connectionsLock.lock();
|
||||
try {
|
||||
closed = true;
|
||||
for(Connection c : connections) c.close();
|
||||
openConnections -= connections.size();
|
||||
connections.clear();
|
||||
while(openConnections > 0) {
|
||||
try {
|
||||
connections.wait();
|
||||
connectionsChanged.await();
|
||||
} catch(InterruptedException e) {
|
||||
LOG.warning("Interrupted while closing connections");
|
||||
interrupted = true;
|
||||
@@ -518,7 +540,10 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
openConnections -= connections.size();
|
||||
connections.clear();
|
||||
}
|
||||
} finally {
|
||||
connectionsLock.unlock();
|
||||
}
|
||||
|
||||
if(interrupted) Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import org.briarproject.api.Author;
|
||||
@@ -60,13 +62,9 @@ class ConnectorGroup extends Thread implements InvitationTask {
|
||||
private final Collection<InvitationListener> listeners;
|
||||
private final AtomicBoolean connected;
|
||||
private final CountDownLatch localConfirmationLatch;
|
||||
private final Lock synchLock = new ReentrantLock();
|
||||
|
||||
/*
|
||||
* All of the following require locking: this. We don't want to call the
|
||||
* listeners with a lock held, but we need to avoid a race condition in
|
||||
* addListener(), so the state that's accessed in addListener() after
|
||||
* calling listeners.add() must be guarded by a lock.
|
||||
*/
|
||||
// The following are locking: synchLock
|
||||
private int localConfirmationCode = -1, remoteConfirmationCode = -1;
|
||||
private boolean connectionFailed = false;
|
||||
private boolean localCompared = false, remoteCompared = false;
|
||||
@@ -104,12 +102,18 @@ class ConnectorGroup extends Thread implements InvitationTask {
|
||||
localConfirmationLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
public synchronized InvitationState addListener(InvitationListener l) {
|
||||
listeners.add(l);
|
||||
return new InvitationState(localInvitationCode, remoteInvitationCode,
|
||||
localConfirmationCode, remoteConfirmationCode, connected.get(),
|
||||
connectionFailed, localCompared, remoteCompared, localMatched,
|
||||
remoteMatched, remoteName);
|
||||
public InvitationState addListener(InvitationListener l) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
listeners.add(l);
|
||||
return new InvitationState(localInvitationCode,
|
||||
remoteInvitationCode, localConfirmationCode,
|
||||
remoteConfirmationCode, connected.get(), connectionFailed,
|
||||
localCompared, remoteCompared, localMatched, remoteMatched,
|
||||
remoteName);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void removeListener(InvitationListener l) {
|
||||
@@ -130,8 +134,11 @@ class ConnectorGroup extends Thread implements InvitationTask {
|
||||
localProps = db.getLocalProperties();
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
connectionFailed = true;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
for(InvitationListener l : listeners) l.connectionFailed();
|
||||
return;
|
||||
@@ -163,8 +170,11 @@ class ConnectorGroup extends Thread implements InvitationTask {
|
||||
}
|
||||
// If none of the threads connected, inform the listeners
|
||||
if(!connected.get()) {
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
connectionFailed = true;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
for(InvitationListener l : listeners) l.connectionFailed();
|
||||
}
|
||||
@@ -193,17 +203,23 @@ class ConnectorGroup extends Thread implements InvitationTask {
|
||||
}
|
||||
|
||||
public void localConfirmationSucceeded() {
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
localCompared = true;
|
||||
localMatched = true;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
localConfirmationLatch.countDown();
|
||||
}
|
||||
|
||||
public void localConfirmationFailed() {
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
localCompared = true;
|
||||
localMatched = false;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
localConfirmationLatch.countDown();
|
||||
}
|
||||
@@ -216,9 +232,12 @@ class ConnectorGroup extends Thread implements InvitationTask {
|
||||
}
|
||||
|
||||
void keyAgreementSucceeded(int localCode, int remoteCode) {
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
localConfirmationCode = localCode;
|
||||
remoteConfirmationCode = remoteCode;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
for(InvitationListener l : listeners)
|
||||
l.keyAgreementSucceeded(localCode, remoteCode);
|
||||
@@ -230,31 +249,43 @@ class ConnectorGroup extends Thread implements InvitationTask {
|
||||
|
||||
boolean waitForLocalConfirmationResult() throws InterruptedException {
|
||||
localConfirmationLatch.await(CONFIRMATION_TIMEOUT, MILLISECONDS);
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
return localMatched;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void remoteConfirmationSucceeded() {
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
remoteCompared = true;
|
||||
remoteMatched = true;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
for(InvitationListener l : listeners) l.remoteConfirmationSucceeded();
|
||||
}
|
||||
|
||||
void remoteConfirmationFailed() {
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
remoteCompared = true;
|
||||
remoteMatched = false;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
for(InvitationListener l : listeners) l.remoteConfirmationFailed();
|
||||
}
|
||||
|
||||
void pseudonymExchangeSucceeded(Author remoteAuthor) {
|
||||
String name = remoteAuthor.getName();
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
remoteName = name;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
for(InvitationListener l : listeners)
|
||||
l.pseudonymExchangeSucceeded(name);
|
||||
|
||||
@@ -2,34 +2,50 @@ package org.briarproject.lifecycle;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.briarproject.api.lifecycle.ShutdownManager;
|
||||
|
||||
class ShutdownManagerImpl implements ShutdownManager {
|
||||
|
||||
protected final Map<Integer, Thread> hooks; // Locking: this
|
||||
private final Lock synchLock = new ReentrantLock();
|
||||
|
||||
private int nextHandle = 0; // Locking: this
|
||||
// The following are locking: synchLock
|
||||
protected final Map<Integer, Thread> hooks;
|
||||
private int nextHandle = 0;
|
||||
|
||||
ShutdownManagerImpl() {
|
||||
hooks = new HashMap<Integer, Thread>();
|
||||
}
|
||||
|
||||
public synchronized int addShutdownHook(Runnable r) {
|
||||
int handle = nextHandle++;
|
||||
Thread hook = createThread(r);
|
||||
hooks.put(handle, hook);
|
||||
Runtime.getRuntime().addShutdownHook(hook);
|
||||
return handle;
|
||||
public int addShutdownHook(Runnable r) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
int handle = nextHandle++;
|
||||
Thread hook = createThread(r);
|
||||
hooks.put(handle, hook);
|
||||
Runtime.getRuntime().addShutdownHook(hook);
|
||||
return handle;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected Thread createThread(Runnable r) {
|
||||
return new Thread(r, "ShutdownManager");
|
||||
}
|
||||
|
||||
public synchronized boolean removeShutdownHook(int handle) {
|
||||
Thread hook = hooks.remove(handle);
|
||||
if(hook == null) return false;
|
||||
else return Runtime.getRuntime().removeShutdownHook(hook);
|
||||
public boolean removeShutdownHook(int handle) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
Thread hook = hooks.remove(handle);
|
||||
if(hook == null) return false;
|
||||
else return Runtime.getRuntime().removeShutdownHook(hook);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,9 +9,9 @@ import org.briarproject.api.messaging.Group;
|
||||
import org.briarproject.api.messaging.GroupFactory;
|
||||
import org.briarproject.api.messaging.MessageFactory;
|
||||
import org.briarproject.api.messaging.MessageVerifier;
|
||||
import org.briarproject.api.messaging.MessagingSessionFactory;
|
||||
import org.briarproject.api.messaging.PacketReaderFactory;
|
||||
import org.briarproject.api.messaging.PacketWriterFactory;
|
||||
import org.briarproject.api.messaging.MessagingSessionFactory;
|
||||
import org.briarproject.api.messaging.SubscriptionUpdate;
|
||||
import org.briarproject.api.messaging.UnverifiedMessage;
|
||||
import org.briarproject.api.serial.StructReader;
|
||||
|
||||
@@ -8,6 +8,8 @@ import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import org.briarproject.api.ContactId;
|
||||
@@ -25,9 +27,10 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
|
||||
Logger.getLogger(ConnectionRegistryImpl.class.getName());
|
||||
|
||||
private final EventBus eventBus;
|
||||
// Locking: this
|
||||
private final Lock synchLock = new ReentrantLock();
|
||||
|
||||
// The following are locking: synchLock
|
||||
private final Map<TransportId, Map<ContactId, Integer>> connections;
|
||||
// Locking: this
|
||||
private final Map<ContactId, Integer> contactCounts;
|
||||
|
||||
@Inject
|
||||
@@ -40,7 +43,8 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
|
||||
public void registerConnection(ContactId c, TransportId t) {
|
||||
LOG.info("Connection registered");
|
||||
boolean firstConnection = false;
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
Map<ContactId, Integer> m = connections.get(t);
|
||||
if(m == null) {
|
||||
m = new HashMap<ContactId, Integer>();
|
||||
@@ -56,7 +60,10 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
|
||||
} else {
|
||||
contactCounts.put(c, count + 1);
|
||||
}
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
if(firstConnection) {
|
||||
LOG.info("Contact connected");
|
||||
eventBus.broadcast(new ContactConnectedEvent(c));
|
||||
@@ -66,7 +73,8 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
|
||||
public void unregisterConnection(ContactId c, TransportId t) {
|
||||
LOG.info("Connection unregistered");
|
||||
boolean lastConnection = false;
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
Map<ContactId, Integer> m = connections.get(t);
|
||||
if(m == null) throw new IllegalArgumentException();
|
||||
Integer count = m.remove(c);
|
||||
@@ -84,23 +92,38 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
|
||||
} else {
|
||||
contactCounts.put(c, count - 1);
|
||||
}
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
if(lastConnection) {
|
||||
LOG.info("Contact disconnected");
|
||||
eventBus.broadcast(new ContactDisconnectedEvent(c));
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Collection<ContactId> getConnectedContacts(
|
||||
public Collection<ContactId> getConnectedContacts(
|
||||
TransportId t) {
|
||||
Map<ContactId, Integer> m = connections.get(t);
|
||||
if(m == null) return Collections.emptyList();
|
||||
List<ContactId> ids = new ArrayList<ContactId>(m.keySet());
|
||||
if(LOG.isLoggable(INFO)) LOG.info(ids.size() + " contacts connected");
|
||||
return Collections.unmodifiableList(ids);
|
||||
synchLock.lock();
|
||||
try {
|
||||
Map<ContactId, Integer> m = connections.get(t);
|
||||
if(m == null) return Collections.emptyList();
|
||||
List<ContactId> ids = new ArrayList<ContactId>(m.keySet());
|
||||
if(LOG.isLoggable(INFO)) LOG.info(ids.size() + " contacts connected");
|
||||
return Collections.unmodifiableList(ids);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public synchronized boolean isConnected(ContactId c) {
|
||||
return contactCounts.containsKey(c);
|
||||
public boolean isConnected(ContactId c) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
return contactCounts.containsKey(c);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package org.briarproject.reliability;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.briarproject.api.reliability.ReadHandler;
|
||||
import org.briarproject.api.system.Clock;
|
||||
@@ -16,9 +21,13 @@ class Receiver implements ReadHandler {
|
||||
|
||||
private final Clock clock;
|
||||
private final Sender sender;
|
||||
private final SortedSet<Data> dataFrames; // Locking: this
|
||||
private final Lock windowLock = new ReentrantLock();
|
||||
private final Condition dataFrameAvailable = windowLock.newCondition();
|
||||
|
||||
// The following are locking: windowLock
|
||||
private final SortedSet<Data> dataFrames;
|
||||
private int windowSize = MAX_WINDOW_SIZE;
|
||||
|
||||
private int windowSize = MAX_WINDOW_SIZE; // Locking: this
|
||||
private long finalSequenceNumber = Long.MAX_VALUE;
|
||||
private long nextSequenceNumber = 1;
|
||||
|
||||
@@ -30,36 +39,44 @@ class Receiver implements ReadHandler {
|
||||
dataFrames = new TreeSet<Data>(new SequenceNumberComparator());
|
||||
}
|
||||
|
||||
synchronized Data read() throws IOException, InterruptedException {
|
||||
long now = clock.currentTimeMillis(), end = now + READ_TIMEOUT;
|
||||
while(now < end && valid) {
|
||||
if(dataFrames.isEmpty()) {
|
||||
// Wait for a data frame
|
||||
wait(end - now);
|
||||
} else {
|
||||
Data d = dataFrames.first();
|
||||
if(d.getSequenceNumber() == nextSequenceNumber) {
|
||||
dataFrames.remove(d);
|
||||
// Update the window
|
||||
windowSize += d.getPayloadLength();
|
||||
sender.sendAck(0, windowSize);
|
||||
nextSequenceNumber++;
|
||||
return d;
|
||||
Data read() throws IOException, InterruptedException {
|
||||
windowLock.lock();
|
||||
try {
|
||||
long now = clock.currentTimeMillis(), end = now + READ_TIMEOUT;
|
||||
while(now < end && valid) {
|
||||
if(dataFrames.isEmpty()) {
|
||||
// Wait for a data frame
|
||||
dataFrameAvailable.await(end - now, MILLISECONDS);
|
||||
} else {
|
||||
// Wait for the next in-order data frame
|
||||
wait(end - now);
|
||||
Data d = dataFrames.first();
|
||||
if(d.getSequenceNumber() == nextSequenceNumber) {
|
||||
dataFrames.remove(d);
|
||||
// Update the window
|
||||
windowSize += d.getPayloadLength();
|
||||
sender.sendAck(0, windowSize);
|
||||
nextSequenceNumber++;
|
||||
return d;
|
||||
} else {
|
||||
// Wait for the next in-order data frame
|
||||
dataFrameAvailable.await(end - now, MILLISECONDS);
|
||||
}
|
||||
}
|
||||
now = clock.currentTimeMillis();
|
||||
}
|
||||
now = clock.currentTimeMillis();
|
||||
if(valid) throw new IOException("Read timed out");
|
||||
throw new IOException("Connection closed");
|
||||
} finally {
|
||||
windowLock.unlock();
|
||||
}
|
||||
if(valid) throw new IOException("Read timed out");
|
||||
throw new IOException("Connection closed");
|
||||
}
|
||||
|
||||
void invalidate() {
|
||||
valid = false;
|
||||
synchronized(this) {
|
||||
notifyAll();
|
||||
windowLock.lock();
|
||||
try {
|
||||
dataFrameAvailable.signalAll();
|
||||
} finally {
|
||||
windowLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,43 +96,48 @@ class Receiver implements ReadHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void handleData(byte[] b) throws IOException {
|
||||
if(b.length < Data.MIN_LENGTH || b.length > Data.MAX_LENGTH) {
|
||||
// Ignore data frame with invalid length
|
||||
return;
|
||||
}
|
||||
Data d = new Data(b);
|
||||
int payloadLength = d.getPayloadLength();
|
||||
if(payloadLength > windowSize) return; // No space in the window
|
||||
if(d.getChecksum() != d.calculateChecksum()) {
|
||||
// Ignore data frame with invalid checksum
|
||||
return;
|
||||
}
|
||||
long sequenceNumber = d.getSequenceNumber();
|
||||
if(sequenceNumber == 0) {
|
||||
// Window probe
|
||||
} else if(sequenceNumber < nextSequenceNumber) {
|
||||
// Duplicate data frame
|
||||
} else if(d.isLastFrame()) {
|
||||
finalSequenceNumber = sequenceNumber;
|
||||
// Remove any data frames with higher sequence numbers
|
||||
Iterator<Data> it = dataFrames.iterator();
|
||||
while(it.hasNext()) {
|
||||
Data d1 = it.next();
|
||||
if(d1.getSequenceNumber() >= finalSequenceNumber) it.remove();
|
||||
private void handleData(byte[] b) throws IOException {
|
||||
windowLock.lock();
|
||||
try {
|
||||
if(b.length < Data.MIN_LENGTH || b.length > Data.MAX_LENGTH) {
|
||||
// Ignore data frame with invalid length
|
||||
return;
|
||||
}
|
||||
if(dataFrames.add(d)) {
|
||||
windowSize -= payloadLength;
|
||||
notifyAll();
|
||||
Data d = new Data(b);
|
||||
int payloadLength = d.getPayloadLength();
|
||||
if(payloadLength > windowSize) return; // No space in the window
|
||||
if(d.getChecksum() != d.calculateChecksum()) {
|
||||
// Ignore data frame with invalid checksum
|
||||
return;
|
||||
}
|
||||
} else if(sequenceNumber < finalSequenceNumber) {
|
||||
if(dataFrames.add(d)) {
|
||||
windowSize -= payloadLength;
|
||||
notifyAll();
|
||||
long sequenceNumber = d.getSequenceNumber();
|
||||
if(sequenceNumber == 0) {
|
||||
// Window probe
|
||||
} else if(sequenceNumber < nextSequenceNumber) {
|
||||
// Duplicate data frame
|
||||
} else if(d.isLastFrame()) {
|
||||
finalSequenceNumber = sequenceNumber;
|
||||
// Remove any data frames with higher sequence numbers
|
||||
Iterator<Data> it = dataFrames.iterator();
|
||||
while(it.hasNext()) {
|
||||
Data d1 = it.next();
|
||||
if(d1.getSequenceNumber() >= finalSequenceNumber) it.remove();
|
||||
}
|
||||
if(dataFrames.add(d)) {
|
||||
windowSize -= payloadLength;
|
||||
dataFrameAvailable.signalAll();
|
||||
}
|
||||
} else if(sequenceNumber < finalSequenceNumber) {
|
||||
if(dataFrames.add(d)) {
|
||||
windowSize -= payloadLength;
|
||||
dataFrameAvailable.signalAll();
|
||||
}
|
||||
}
|
||||
// Acknowledge the data frame even if it's a duplicate
|
||||
sender.sendAck(sequenceNumber, windowSize);
|
||||
} finally {
|
||||
windowLock.unlock();
|
||||
}
|
||||
// Acknowledge the data frame even if it's a duplicate
|
||||
sender.sendAck(sequenceNumber, windowSize);
|
||||
}
|
||||
|
||||
private static class SequenceNumberComparator implements Comparator<Data> {
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package org.briarproject.reliability;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.briarproject.api.reliability.WriteHandler;
|
||||
import org.briarproject.api.system.Clock;
|
||||
@@ -21,9 +26,11 @@ class Sender {
|
||||
|
||||
private final Clock clock;
|
||||
private final WriteHandler writeHandler;
|
||||
private final LinkedList<Outstanding> outstanding; // Locking: this
|
||||
private final Lock windowLock = new ReentrantLock();
|
||||
private final Condition sendWindowAvailable = windowLock.newCondition();
|
||||
|
||||
// All of the following are locking: this
|
||||
// The following are locking: windowLock
|
||||
private final LinkedList<Outstanding> outstanding;
|
||||
private int outstandingBytes = 0;
|
||||
private int windowSize = Data.MAX_PAYLOAD_LENGTH;
|
||||
private int rtt = INITIAL_RTT, rttVar = INITIAL_RTT_VAR;
|
||||
@@ -58,7 +65,8 @@ class Sender {
|
||||
long sequenceNumber = a.getSequenceNumber();
|
||||
long now = clock.currentTimeMillis();
|
||||
Outstanding fastRetransmit = null;
|
||||
synchronized(this) {
|
||||
windowLock.lock();
|
||||
try {
|
||||
// Remove the acked data frame if it's outstanding
|
||||
int foundIndex = -1;
|
||||
Iterator<Outstanding> it = outstanding.iterator();
|
||||
@@ -94,7 +102,10 @@ class Sender {
|
||||
// Don't accept an unreasonably large window size
|
||||
windowSize = Math.min(a.getWindowSize(), MAX_WINDOW_SIZE);
|
||||
// If space has become available, notify any waiting writers
|
||||
if(windowSize > oldWindowSize || foundIndex != -1) notifyAll();
|
||||
if(windowSize > oldWindowSize || foundIndex != -1)
|
||||
sendWindowAvailable.signalAll();
|
||||
} finally {
|
||||
windowLock.unlock();
|
||||
}
|
||||
// Fast retransmission
|
||||
if(fastRetransmit != null)
|
||||
@@ -105,7 +116,8 @@ class Sender {
|
||||
long now = clock.currentTimeMillis();
|
||||
List<Outstanding> retransmit = null;
|
||||
boolean sendProbe = false;
|
||||
synchronized(this) {
|
||||
windowLock.lock();
|
||||
try {
|
||||
if(outstanding.isEmpty()) {
|
||||
if(dataWaiting && now - lastWindowUpdateOrProbe > rto) {
|
||||
sendProbe = true;
|
||||
@@ -134,6 +146,8 @@ class Sender {
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
windowLock.unlock();
|
||||
}
|
||||
// Send a window probe if necessary
|
||||
if(sendProbe) {
|
||||
@@ -151,12 +165,13 @@ class Sender {
|
||||
|
||||
void write(Data d) throws IOException, InterruptedException {
|
||||
int payloadLength = d.getPayloadLength();
|
||||
synchronized(this) {
|
||||
windowLock.lock();
|
||||
try {
|
||||
// Wait for space in the window
|
||||
long now = clock.currentTimeMillis(), end = now + WRITE_TIMEOUT;
|
||||
while(now < end && outstandingBytes + payloadLength >= windowSize) {
|
||||
dataWaiting = true;
|
||||
wait(end - now);
|
||||
sendWindowAvailable.await(end - now, MILLISECONDS);
|
||||
now = clock.currentTimeMillis();
|
||||
}
|
||||
if(outstandingBytes + payloadLength >= windowSize)
|
||||
@@ -164,12 +179,20 @@ class Sender {
|
||||
outstanding.add(new Outstanding(d, now));
|
||||
outstandingBytes += payloadLength;
|
||||
dataWaiting = false;
|
||||
} finally {
|
||||
windowLock.unlock();
|
||||
}
|
||||
writeHandler.handleWrite(d.getBuffer());
|
||||
}
|
||||
|
||||
synchronized void flush() throws IOException, InterruptedException {
|
||||
while(dataWaiting || !outstanding.isEmpty()) wait();
|
||||
void flush() throws IOException, InterruptedException {
|
||||
windowLock.lock();
|
||||
try {
|
||||
while(dataWaiting || !outstanding.isEmpty())
|
||||
sendWindowAvailable.await();
|
||||
} finally {
|
||||
windowLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private static class Outstanding {
|
||||
|
||||
@@ -11,6 +11,8 @@ import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import javax.inject.Inject;
|
||||
@@ -48,8 +50,9 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
|
||||
private final TagRecogniser tagRecogniser;
|
||||
private final Clock clock;
|
||||
private final Timer timer;
|
||||
private final Lock synchLock = new ReentrantLock();
|
||||
|
||||
// All of the following are locking: this
|
||||
// The following are locking: synchLock
|
||||
private final Map<TransportId, Integer> maxLatencies;
|
||||
private final Map<EndpointKey, TemporarySecret> oldSecrets;
|
||||
private final Map<EndpointKey, TemporarySecret> currentSecrets;
|
||||
@@ -71,45 +74,54 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
|
||||
newSecrets = new HashMap<EndpointKey, TemporarySecret>();
|
||||
}
|
||||
|
||||
public synchronized boolean start() {
|
||||
eventBus.addListener(this);
|
||||
// Load the temporary secrets and transport latencies from the database
|
||||
Collection<TemporarySecret> secrets;
|
||||
public boolean start() {
|
||||
synchLock.lock();
|
||||
try {
|
||||
secrets = db.getSecrets();
|
||||
maxLatencies.putAll(db.getTransportLatencies());
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return false;
|
||||
}
|
||||
// Work out what phase of its lifecycle each secret is in
|
||||
long now = clock.currentTimeMillis();
|
||||
Collection<TemporarySecret> dead = assignSecretsToMaps(now, secrets);
|
||||
// Replace any dead secrets
|
||||
Collection<TemporarySecret> created = replaceDeadSecrets(now, dead);
|
||||
if(!created.isEmpty()) {
|
||||
// Store any secrets that have been created, removing any dead ones
|
||||
eventBus.addListener(this);
|
||||
// Load the temporary secrets and transport latencies from the DB
|
||||
Collection<TemporarySecret> secrets;
|
||||
try {
|
||||
db.addSecrets(created);
|
||||
secrets = db.getSecrets();
|
||||
maxLatencies.putAll(db.getTransportLatencies());
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return false;
|
||||
}
|
||||
// Work out what phase of its lifecycle each secret is in
|
||||
long now = clock.currentTimeMillis();
|
||||
Collection<TemporarySecret> dead =
|
||||
assignSecretsToMaps(now, secrets);
|
||||
// Replace any dead secrets
|
||||
Collection<TemporarySecret> created = replaceDeadSecrets(now, dead);
|
||||
if(!created.isEmpty()) {
|
||||
// Store any secrets that have been created,
|
||||
// removing any dead ones
|
||||
try {
|
||||
db.addSecrets(created);
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// Pass the old, current and new secrets to the recogniser
|
||||
for(TemporarySecret s : oldSecrets.values())
|
||||
tagRecogniser.addSecret(s);
|
||||
for(TemporarySecret s : currentSecrets.values())
|
||||
tagRecogniser.addSecret(s);
|
||||
for(TemporarySecret s : newSecrets.values())
|
||||
tagRecogniser.addSecret(s);
|
||||
// Schedule periodic key rotation
|
||||
timer.scheduleAtFixedRate(this, MS_BETWEEN_CHECKS,
|
||||
MS_BETWEEN_CHECKS);
|
||||
return true;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
// Pass the old, current and new secrets to the recogniser
|
||||
for(TemporarySecret s : oldSecrets.values())
|
||||
tagRecogniser.addSecret(s);
|
||||
for(TemporarySecret s : currentSecrets.values())
|
||||
tagRecogniser.addSecret(s);
|
||||
for(TemporarySecret s : newSecrets.values())
|
||||
tagRecogniser.addSecret(s);
|
||||
// Schedule periodic key rotation
|
||||
timer.scheduleAtFixedRate(this, MS_BETWEEN_CHECKS, MS_BETWEEN_CHECKS);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Assigns secrets to the appropriate maps and returns any dead secrets
|
||||
// Locking: this
|
||||
// Locking: synchLock
|
||||
private Collection<TemporarySecret> assignSecretsToMaps(long now,
|
||||
Collection<TemporarySecret> secrets) {
|
||||
Collection<TemporarySecret> dead = new ArrayList<TemporarySecret>();
|
||||
@@ -142,7 +154,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
|
||||
}
|
||||
|
||||
// Replaces the given secrets and returns any secrets created
|
||||
// Locking: this
|
||||
// Locking: synchLock
|
||||
private Collection<TemporarySecret> replaceDeadSecrets(long now,
|
||||
Collection<TemporarySecret> dead) {
|
||||
// If there are several dead secrets for an endpoint, use the newest
|
||||
@@ -200,105 +212,125 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
|
||||
return created;
|
||||
}
|
||||
|
||||
public synchronized boolean stop() {
|
||||
eventBus.removeListener(this);
|
||||
timer.cancel();
|
||||
tagRecogniser.removeSecrets();
|
||||
maxLatencies.clear();
|
||||
oldSecrets.clear();
|
||||
currentSecrets.clear();
|
||||
newSecrets.clear();
|
||||
return true;
|
||||
public boolean stop() {
|
||||
synchLock.lock();
|
||||
try {
|
||||
eventBus.removeListener(this);
|
||||
timer.cancel();
|
||||
tagRecogniser.removeSecrets();
|
||||
maxLatencies.clear();
|
||||
oldSecrets.clear();
|
||||
currentSecrets.clear();
|
||||
newSecrets.clear();
|
||||
return true;
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized StreamContext getStreamContext(ContactId c,
|
||||
public StreamContext getStreamContext(ContactId c,
|
||||
TransportId t) {
|
||||
TemporarySecret s = currentSecrets.get(new EndpointKey(c, t));
|
||||
if(s == null) {
|
||||
LOG.info("No secret for endpoint");
|
||||
return null;
|
||||
}
|
||||
long streamNumber;
|
||||
synchLock.lock();
|
||||
try {
|
||||
streamNumber = db.incrementStreamCounter(c, t, s.getPeriod());
|
||||
if(streamNumber == -1) {
|
||||
LOG.info("No counter for period");
|
||||
TemporarySecret s = currentSecrets.get(new EndpointKey(c, t));
|
||||
if(s == null) {
|
||||
LOG.info("No secret for endpoint");
|
||||
return null;
|
||||
}
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return null;
|
||||
long streamNumber;
|
||||
try {
|
||||
streamNumber = db.incrementStreamCounter(c, t, s.getPeriod());
|
||||
if(streamNumber == -1) {
|
||||
LOG.info("No counter for period");
|
||||
return null;
|
||||
}
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return null;
|
||||
}
|
||||
byte[] secret = s.getSecret();
|
||||
return new StreamContext(c, t, secret, streamNumber, s.getAlice());
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
byte[] secret = s.getSecret();
|
||||
return new StreamContext(c, t, secret, streamNumber, s.getAlice());
|
||||
}
|
||||
|
||||
public synchronized void endpointAdded(Endpoint ep, int maxLatency,
|
||||
public void endpointAdded(Endpoint ep, int maxLatency,
|
||||
byte[] initialSecret) {
|
||||
maxLatencies.put(ep.getTransportId(), maxLatency);
|
||||
// Work out which rotation period we're in
|
||||
long elapsed = clock.currentTimeMillis() - ep.getEpoch();
|
||||
long rotation = maxLatency + MAX_CLOCK_DIFFERENCE;
|
||||
long period = (elapsed / rotation) + 1;
|
||||
if(period < 1) throw new IllegalStateException();
|
||||
// Derive the old, current and new secrets
|
||||
byte[] b1 = initialSecret;
|
||||
for(long p = 0; p < period; p++)
|
||||
b1 = crypto.deriveNextSecret(b1, p);
|
||||
byte[] b2 = crypto.deriveNextSecret(b1, period);
|
||||
byte[] b3 = crypto.deriveNextSecret(b2, period + 1);
|
||||
TemporarySecret s1 = new TemporarySecret(ep, period - 1, b1);
|
||||
TemporarySecret s2 = new TemporarySecret(ep, period, b2);
|
||||
TemporarySecret s3 = new TemporarySecret(ep, period + 1, b3);
|
||||
// Add the incoming secrets to their respective maps
|
||||
EndpointKey k = new EndpointKey(ep);
|
||||
oldSecrets.put(k, s1);
|
||||
currentSecrets.put(k, s2);
|
||||
newSecrets.put(k, s3);
|
||||
// Store the new secrets
|
||||
synchLock.lock();
|
||||
try {
|
||||
db.addSecrets(Arrays.asList(s1, s2, s3));
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return;
|
||||
maxLatencies.put(ep.getTransportId(), maxLatency);
|
||||
// Work out which rotation period we're in
|
||||
long elapsed = clock.currentTimeMillis() - ep.getEpoch();
|
||||
long rotation = maxLatency + MAX_CLOCK_DIFFERENCE;
|
||||
long period = (elapsed / rotation) + 1;
|
||||
if(period < 1) throw new IllegalStateException();
|
||||
// Derive the old, current and new secrets
|
||||
byte[] b1 = initialSecret;
|
||||
for(long p = 0; p < period; p++)
|
||||
b1 = crypto.deriveNextSecret(b1, p);
|
||||
byte[] b2 = crypto.deriveNextSecret(b1, period);
|
||||
byte[] b3 = crypto.deriveNextSecret(b2, period + 1);
|
||||
TemporarySecret s1 = new TemporarySecret(ep, period - 1, b1);
|
||||
TemporarySecret s2 = new TemporarySecret(ep, period, b2);
|
||||
TemporarySecret s3 = new TemporarySecret(ep, period + 1, b3);
|
||||
// Add the incoming secrets to their respective maps
|
||||
EndpointKey k = new EndpointKey(ep);
|
||||
oldSecrets.put(k, s1);
|
||||
currentSecrets.put(k, s2);
|
||||
newSecrets.put(k, s3);
|
||||
// Store the new secrets
|
||||
try {
|
||||
db.addSecrets(Arrays.asList(s1, s2, s3));
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return;
|
||||
}
|
||||
// Pass the new secrets to the recogniser
|
||||
tagRecogniser.addSecret(s1);
|
||||
tagRecogniser.addSecret(s2);
|
||||
tagRecogniser.addSecret(s3);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
// Pass the new secrets to the recogniser
|
||||
tagRecogniser.addSecret(s1);
|
||||
tagRecogniser.addSecret(s2);
|
||||
tagRecogniser.addSecret(s3);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void run() {
|
||||
// Rebuild the maps because we may be running a whole period late
|
||||
Collection<TemporarySecret> secrets = new ArrayList<TemporarySecret>();
|
||||
secrets.addAll(oldSecrets.values());
|
||||
secrets.addAll(currentSecrets.values());
|
||||
secrets.addAll(newSecrets.values());
|
||||
oldSecrets.clear();
|
||||
currentSecrets.clear();
|
||||
newSecrets.clear();
|
||||
// Work out what phase of its lifecycle each secret is in
|
||||
long now = clock.currentTimeMillis();
|
||||
Collection<TemporarySecret> dead = assignSecretsToMaps(now, secrets);
|
||||
// Remove any dead secrets from the recogniser
|
||||
for(TemporarySecret s : dead) {
|
||||
ContactId c = s.getContactId();
|
||||
TransportId t = s.getTransportId();
|
||||
long period = s.getPeriod();
|
||||
tagRecogniser.removeSecret(c, t, period);
|
||||
}
|
||||
// Replace any dead secrets
|
||||
Collection<TemporarySecret> created = replaceDeadSecrets(now, dead);
|
||||
if(!created.isEmpty()) {
|
||||
// Store any secrets that have been created
|
||||
try {
|
||||
db.addSecrets(created);
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
public void run() {
|
||||
synchLock.lock();
|
||||
try {
|
||||
// Rebuild the maps because we may be running a whole period late
|
||||
Collection<TemporarySecret> secrets = new ArrayList<TemporarySecret>();
|
||||
secrets.addAll(oldSecrets.values());
|
||||
secrets.addAll(currentSecrets.values());
|
||||
secrets.addAll(newSecrets.values());
|
||||
oldSecrets.clear();
|
||||
currentSecrets.clear();
|
||||
newSecrets.clear();
|
||||
// Work out what phase of its lifecycle each secret is in
|
||||
long now = clock.currentTimeMillis();
|
||||
Collection<TemporarySecret> dead = assignSecretsToMaps(now, secrets);
|
||||
// Remove any dead secrets from the recogniser
|
||||
for(TemporarySecret s : dead) {
|
||||
ContactId c = s.getContactId();
|
||||
TransportId t = s.getTransportId();
|
||||
long period = s.getPeriod();
|
||||
tagRecogniser.removeSecret(c, t, period);
|
||||
}
|
||||
// Pass any secrets that have been created to the recogniser
|
||||
for(TemporarySecret s : created) tagRecogniser.addSecret(s);
|
||||
// Replace any dead secrets
|
||||
Collection<TemporarySecret> created = replaceDeadSecrets(now, dead);
|
||||
if(!created.isEmpty()) {
|
||||
// Store any secrets that have been created
|
||||
try {
|
||||
db.addSecrets(created);
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
}
|
||||
// Pass any secrets that have been created to the recogniser
|
||||
for(TemporarySecret s : created) tagRecogniser.addSecret(s);
|
||||
}
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,14 +347,14 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
|
||||
}
|
||||
}
|
||||
|
||||
// Locking: this
|
||||
// Locking: synchLock
|
||||
private void removeSecrets(ContactId c, Map<?, TemporarySecret> m) {
|
||||
Iterator<TemporarySecret> it = m.values().iterator();
|
||||
while(it.hasNext())
|
||||
if(it.next().getContactId().equals(c)) it.remove();
|
||||
}
|
||||
|
||||
// Locking: this
|
||||
// Locking: synchLock
|
||||
private void removeSecrets(TransportId t, Map<?, TemporarySecret> m) {
|
||||
Iterator<TemporarySecret> it = m.values().iterator();
|
||||
while(it.hasNext())
|
||||
@@ -371,10 +403,13 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
|
||||
public void run() {
|
||||
ContactId c = event.getContactId();
|
||||
tagRecogniser.removeSecrets(c);
|
||||
synchronized(KeyManagerImpl.this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
removeSecrets(c, oldSecrets);
|
||||
removeSecrets(c, currentSecrets);
|
||||
removeSecrets(c, newSecrets);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -389,8 +424,11 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized(KeyManagerImpl.this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
maxLatencies.put(event.getTransportId(), event.getMaxLatency());
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -407,11 +445,14 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
|
||||
public void run() {
|
||||
TransportId t = event.getTransportId();
|
||||
tagRecogniser.removeSecrets(t);
|
||||
synchronized(KeyManagerImpl.this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
maxLatencies.remove(t);
|
||||
removeSecrets(t, oldSecrets);
|
||||
removeSecrets(t, currentSecrets);
|
||||
removeSecrets(t, newSecrets);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ package org.briarproject.transport;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
@@ -18,7 +20,9 @@ class TagRecogniserImpl implements TagRecogniser {
|
||||
|
||||
private final CryptoComponent crypto;
|
||||
private final DatabaseComponent db;
|
||||
// Locking: this
|
||||
private final Lock synchLock = new ReentrantLock();
|
||||
|
||||
// Locking: synchLock
|
||||
private final Map<TransportId, TransportTagRecogniser> recognisers;
|
||||
|
||||
@Inject
|
||||
@@ -31,8 +35,11 @@ class TagRecogniserImpl implements TagRecogniser {
|
||||
public StreamContext recogniseTag(TransportId t, byte[] tag)
|
||||
throws DbException {
|
||||
TransportTagRecogniser r;
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
r = recognisers.get(t);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
if(r == null) return null;
|
||||
return r.recogniseTag(tag);
|
||||
@@ -41,35 +48,58 @@ class TagRecogniserImpl implements TagRecogniser {
|
||||
public void addSecret(TemporarySecret s) {
|
||||
TransportId t = s.getTransportId();
|
||||
TransportTagRecogniser r;
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
r = recognisers.get(t);
|
||||
if(r == null) {
|
||||
r = new TransportTagRecogniser(crypto, db, t);
|
||||
recognisers.put(t, r);
|
||||
}
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
r.addSecret(s);
|
||||
}
|
||||
|
||||
public void removeSecret(ContactId c, TransportId t, long period) {
|
||||
TransportTagRecogniser r;
|
||||
synchronized(this) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
r = recognisers.get(t);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
if(r != null) r.removeSecret(c, period);
|
||||
}
|
||||
|
||||
public synchronized void removeSecrets(ContactId c) {
|
||||
for(TransportTagRecogniser r : recognisers.values())
|
||||
r.removeSecrets(c);
|
||||
public void removeSecrets(ContactId c) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
for(TransportTagRecogniser r : recognisers.values())
|
||||
r.removeSecrets(c);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void removeSecrets(TransportId t) {
|
||||
recognisers.remove(t);
|
||||
public void removeSecrets(TransportId t) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
recognisers.remove(t);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public synchronized void removeSecrets() {
|
||||
for(TransportTagRecogniser r : recognisers.values())
|
||||
r.removeSecrets();
|
||||
public void removeSecrets() {
|
||||
synchLock.lock();
|
||||
try {
|
||||
for(TransportTagRecogniser r : recognisers.values())
|
||||
r.removeSecrets();
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,8 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.briarproject.api.Bytes;
|
||||
import org.briarproject.api.ContactId;
|
||||
@@ -27,8 +29,11 @@ class TransportTagRecogniser {
|
||||
private final CryptoComponent crypto;
|
||||
private final DatabaseComponent db;
|
||||
private final TransportId transportId;
|
||||
private final Map<Bytes, TagContext> tagMap; // Locking: this
|
||||
private final Map<RemovalKey, RemovalContext> removalMap; // Locking: this
|
||||
private final Lock synchLock = new ReentrantLock();
|
||||
|
||||
// The following are locking: synchLock
|
||||
private final Map<Bytes, TagContext> tagMap;
|
||||
private final Map<RemovalKey, RemovalContext> removalMap;
|
||||
|
||||
TransportTagRecogniser(CryptoComponent crypto, DatabaseComponent db,
|
||||
TransportId transportId) {
|
||||
@@ -39,61 +44,76 @@ class TransportTagRecogniser {
|
||||
removalMap = new HashMap<RemovalKey, RemovalContext>();
|
||||
}
|
||||
|
||||
synchronized StreamContext recogniseTag(byte[] tag) throws DbException {
|
||||
TagContext t = tagMap.remove(new Bytes(tag));
|
||||
if(t == null) return null; // The tag was not expected
|
||||
// Update the reordering window and the expected tags
|
||||
SecretKey key = crypto.deriveTagKey(t.secret, !t.alice);
|
||||
for(long streamNumber : t.window.setSeen(t.streamNumber)) {
|
||||
byte[] tag1 = new byte[TAG_LENGTH];
|
||||
crypto.encodeTag(tag1, key, streamNumber);
|
||||
if(streamNumber < t.streamNumber) {
|
||||
TagContext removed = tagMap.remove(new Bytes(tag1));
|
||||
assert removed != null;
|
||||
} else {
|
||||
TagContext added = new TagContext(t, streamNumber);
|
||||
TagContext duplicate = tagMap.put(new Bytes(tag1), added);
|
||||
StreamContext recogniseTag(byte[] tag) throws DbException {
|
||||
synchLock.lock();
|
||||
try {
|
||||
TagContext t = tagMap.remove(new Bytes(tag));
|
||||
if(t == null) return null; // The tag was not expected
|
||||
// Update the reordering window and the expected tags
|
||||
SecretKey key = crypto.deriveTagKey(t.secret, !t.alice);
|
||||
for(long streamNumber : t.window.setSeen(t.streamNumber)) {
|
||||
byte[] tag1 = new byte[TAG_LENGTH];
|
||||
crypto.encodeTag(tag1, key, streamNumber);
|
||||
if(streamNumber < t.streamNumber) {
|
||||
TagContext removed = tagMap.remove(new Bytes(tag1));
|
||||
assert removed != null;
|
||||
} else {
|
||||
TagContext added = new TagContext(t, streamNumber);
|
||||
TagContext duplicate = tagMap.put(new Bytes(tag1), added);
|
||||
assert duplicate == null;
|
||||
}
|
||||
}
|
||||
// Store the updated reordering window in the DB
|
||||
db.setReorderingWindow(t.contactId, transportId, t.period,
|
||||
t.window.getCentre(), t.window.getBitmap());
|
||||
return new StreamContext(t.contactId, transportId, t.secret,
|
||||
t.streamNumber, t.alice);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void addSecret(TemporarySecret s) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
ContactId contactId = s.getContactId();
|
||||
boolean alice = s.getAlice();
|
||||
long period = s.getPeriod();
|
||||
byte[] secret = s.getSecret();
|
||||
long centre = s.getWindowCentre();
|
||||
byte[] bitmap = s.getWindowBitmap();
|
||||
// Create the reordering window and the expected tags
|
||||
SecretKey key = crypto.deriveTagKey(secret, !alice);
|
||||
ReorderingWindow window = new ReorderingWindow(centre, bitmap);
|
||||
for(long streamNumber : window.getUnseen()) {
|
||||
byte[] tag = new byte[TAG_LENGTH];
|
||||
crypto.encodeTag(tag, key, streamNumber);
|
||||
TagContext added = new TagContext(contactId, alice, period,
|
||||
secret, window, streamNumber);
|
||||
TagContext duplicate = tagMap.put(new Bytes(tag), added);
|
||||
assert duplicate == null;
|
||||
}
|
||||
// Create a removal context to remove the window and the tags later
|
||||
RemovalContext r = new RemovalContext(window, secret, alice);
|
||||
removalMap.put(new RemovalKey(contactId, period), r);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
// Store the updated reordering window in the DB
|
||||
db.setReorderingWindow(t.contactId, transportId, t.period,
|
||||
t.window.getCentre(), t.window.getBitmap());
|
||||
return new StreamContext(t.contactId, transportId, t.secret,
|
||||
t.streamNumber, t.alice);
|
||||
}
|
||||
|
||||
synchronized void addSecret(TemporarySecret s) {
|
||||
ContactId contactId = s.getContactId();
|
||||
boolean alice = s.getAlice();
|
||||
long period = s.getPeriod();
|
||||
byte[] secret = s.getSecret();
|
||||
long centre = s.getWindowCentre();
|
||||
byte[] bitmap = s.getWindowBitmap();
|
||||
// Create the reordering window and the expected tags
|
||||
SecretKey key = crypto.deriveTagKey(secret, !alice);
|
||||
ReorderingWindow window = new ReorderingWindow(centre, bitmap);
|
||||
for(long streamNumber : window.getUnseen()) {
|
||||
byte[] tag = new byte[TAG_LENGTH];
|
||||
crypto.encodeTag(tag, key, streamNumber);
|
||||
TagContext added = new TagContext(contactId, alice, period,
|
||||
secret, window, streamNumber);
|
||||
TagContext duplicate = tagMap.put(new Bytes(tag), added);
|
||||
assert duplicate == null;
|
||||
void removeSecret(ContactId contactId, long period) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
RemovalKey k = new RemovalKey(contactId, period);
|
||||
RemovalContext removed = removalMap.remove(k);
|
||||
if(removed == null) throw new IllegalArgumentException();
|
||||
removeSecret(removed);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
// Create a removal context to remove the window and the tags later
|
||||
RemovalContext r = new RemovalContext(window, secret, alice);
|
||||
removalMap.put(new RemovalKey(contactId, period), r);
|
||||
}
|
||||
|
||||
synchronized void removeSecret(ContactId contactId, long period) {
|
||||
RemovalKey k = new RemovalKey(contactId, period);
|
||||
RemovalContext removed = removalMap.remove(k);
|
||||
if(removed == null) throw new IllegalArgumentException();
|
||||
removeSecret(removed);
|
||||
}
|
||||
|
||||
// Locking: this
|
||||
// Locking: synchLock
|
||||
private void removeSecret(RemovalContext r) {
|
||||
// Remove the expected tags
|
||||
SecretKey key = crypto.deriveTagKey(r.secret, !r.alice);
|
||||
@@ -105,17 +125,28 @@ class TransportTagRecogniser {
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void removeSecrets(ContactId c) {
|
||||
Collection<RemovalKey> keysToRemove = new ArrayList<RemovalKey>();
|
||||
for(RemovalKey k : removalMap.keySet())
|
||||
if(k.contactId.equals(c)) keysToRemove.add(k);
|
||||
for(RemovalKey k : keysToRemove) removeSecret(k.contactId, k.period);
|
||||
void removeSecrets(ContactId c) {
|
||||
synchLock.lock();
|
||||
try {
|
||||
Collection<RemovalKey> keysToRemove = new ArrayList<RemovalKey>();
|
||||
for(RemovalKey k : removalMap.keySet())
|
||||
if(k.contactId.equals(c)) keysToRemove.add(k);
|
||||
for(RemovalKey k : keysToRemove)
|
||||
removeSecret(k.contactId, k.period);
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void removeSecrets() {
|
||||
for(RemovalContext r : removalMap.values()) removeSecret(r);
|
||||
assert tagMap.isEmpty();
|
||||
removalMap.clear();
|
||||
void removeSecrets() {
|
||||
synchLock.lock();
|
||||
try {
|
||||
for(RemovalContext r : removalMap.values()) removeSecret(r);
|
||||
assert tagMap.isEmpty();
|
||||
removalMap.clear();
|
||||
} finally {
|
||||
synchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private static class TagContext {
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
package org.briarproject.util;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class LatchedReference<T> {
|
||||
@@ -23,7 +24,7 @@ public class LatchedReference<T> {
|
||||
}
|
||||
|
||||
public T waitForReference(long timeout) throws InterruptedException {
|
||||
latch.await(timeout, TimeUnit.MILLISECONDS);
|
||||
latch.await(timeout, MILLISECONDS);
|
||||
return reference.get();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user