Refactor KeyManager and TagRecogniser. #55

This commit is contained in:
akwizgran
2015-02-12 09:11:24 +00:00
parent 878a70620d
commit 9868feeb2a
60 changed files with 2123 additions and 3840 deletions

View File

@@ -1,9 +1,5 @@
package org.briarproject.db;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorId;
import org.briarproject.api.Contact;
@@ -25,8 +21,11 @@ import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.transport.Endpoint;
import org.briarproject.api.transport.TemporarySecret;
import org.briarproject.api.transport.TransportKeys;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
// FIXME: Document the preconditions for calling each method
@@ -89,13 +88,6 @@ interface Database<T> {
ContactId addContact(T txn, Author remote, AuthorId local)
throws DbException;
/**
* Stores an endpoint.
* <p>
* Locking: write.
*/
void addEndpoint(T txn, Endpoint ep) throws DbException;
/**
* Subscribes to a group, or returns false if the user already has the
* maximum number of subscriptions.
@@ -125,15 +117,6 @@ interface Database<T> {
*/
void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException;
/**
* Stores the given temporary secrets and deletes any secrets that have
* been made obsolete.
* <p>
* Locking: write.
*/
void addSecrets(T txn, Collection<TemporarySecret> secrets)
throws DbException;
/**
* Initialises the status of the given message with respect to the given
* contact.
@@ -154,6 +137,13 @@ interface Database<T> {
boolean addTransport(T txn, TransportId t, int maxLatency)
throws DbException;
/**
* Stores the given transport keys for a newly added contact.
* <p>
* Locking: write.
*/
void addTransportKeys(T txn, ContactId c, TransportKeys k) throws DbException;
/**
* Makes a group visible to the given contact.
* <p>
@@ -270,13 +260,6 @@ interface Database<T> {
*/
Collection<ContactId> getContacts(T txn, AuthorId a) throws DbException;
/**
* Returns all endpoints.
* <p>
* Locking: read.
*/
Collection<Endpoint> getEndpoints(T txn) throws DbException;
/**
* Returns the amount of free storage space available to the database, in
* bytes. This is based on the minimum of the space available on the device
@@ -461,13 +444,6 @@ interface Database<T> {
RetentionUpdate getRetentionUpdate(T txn, ContactId c, int maxLatency)
throws DbException;
/**
* Returns all temporary secrets.
* <p>
* Locking: read.
*/
Collection<TemporarySecret> getSecrets(T txn) throws DbException;
/**
* Returns all settings.
* <p>
@@ -509,7 +485,15 @@ interface Database<T> {
throws DbException;
/**
* Returns the maximum latencies of all supported transports.
* Returns all transport keys for the given transport.
* <p>
* Locking: read.
*/
Map<ContactId, TransportKeys> getTransportKeys(T txn, TransportId t)
throws DbException;
/**
* Returns the maximum latencies in milliseconds of all transports.
* <p>
* Locking: read.
*/
@@ -540,14 +524,13 @@ interface Database<T> {
Collection<ContactId> getVisibility(T txn, GroupId g) throws DbException;
/**
* Increments the outgoing stream counter for the given endpoint in the
* given rotation period and returns the old value, or -1 if the counter
* does not exist.
* Increments the outgoing stream counter for the given contact and
* transport in the given rotation period.
* <p>
* Locking: write.
*/
long incrementStreamCounter(T txn, ContactId c, TransportId t, long period)
throws DbException;
void incrementStreamCounter(T txn, ContactId c, TransportId t,
long rotationPeriod) throws DbException;
/**
* Increments the retention time versions for all contacts to indicate that
@@ -692,13 +675,13 @@ interface Database<T> {
void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException;
/**
* Sets the reordering window for the given endpoint in the given rotation
* period.
* Sets the reordering window for the given contact and transport in the
* given rotation period.
* <p>
* Locking: write.
*/
void setReorderingWindow(T txn, ContactId c, TransportId t, long period,
long centre, byte[] bitmap) throws DbException;
void setReorderingWindow(T txn, ContactId c, TransportId t,
long rotationPeriod, long base, byte[] bitmap) throws DbException;
/**
* Updates the groups to which the given contact subscribes and returns
@@ -716,7 +699,7 @@ interface Database<T> {
* <p>
* Locking: write.
*/
public void setInboxGroup(T txn, ContactId c, Group g) throws DbException;
void setInboxGroup(T txn, ContactId c, Group g) throws DbException;
/**
* Marks a message as read or unread.
@@ -798,4 +781,12 @@ interface Database<T> {
*/
void updateExpiryTime(T txn, ContactId c, MessageId m, int maxLatency)
throws DbException;
/**
* Stores the given transport keys, deleting any keys they have replaced.
* <p>
* Locking: write.
*/
void updateTransportKeys(T txn, Map<ContactId, TransportKeys> keys)
throws DbException;
}

View File

@@ -1,25 +1,5 @@
package org.briarproject.db;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.db.DatabaseConstants.BYTES_PER_SWEEP;
import static org.briarproject.db.DatabaseConstants.CRITICAL_FREE_SPACE;
import static org.briarproject.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
import static org.briarproject.db.DatabaseConstants.MAX_TRANSACTIONS_BETWEEN_SPACE_CHECKS;
import static org.briarproject.db.DatabaseConstants.MIN_FREE_SPACE;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorId;
import org.briarproject.api.Contact;
@@ -75,8 +55,29 @@ import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.transport.Endpoint;
import org.briarproject.api.transport.TemporarySecret;
import org.briarproject.api.transport.TransportKeys;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.db.DatabaseConstants.BYTES_PER_SWEEP;
import static org.briarproject.db.DatabaseConstants.CRITICAL_FREE_SPACE;
import static org.briarproject.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
import static org.briarproject.db.DatabaseConstants.MAX_TRANSACTIONS_BETWEEN_SPACE_CHECKS;
import static org.briarproject.db.DatabaseConstants.MIN_FREE_SPACE;
/**
* An implementation of DatabaseComponent using reentrant read-write locks.
@@ -85,7 +86,7 @@ import org.briarproject.api.transport.TemporarySecret;
* implementation is safe on a given JVM.
*/
class DatabaseComponentImpl<T> implements DatabaseComponent,
DatabaseCleaner.Callback {
DatabaseCleaner.Callback {
private static final Logger LOG =
Logger.getLogger(DatabaseComponentImpl.class.getName());
@@ -180,26 +181,6 @@ DatabaseCleaner.Callback {
return c;
}
public void addEndpoint(Endpoint ep) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, ep.getContactId()))
throw new NoSuchContactException();
if (!db.containsTransport(txn, ep.getTransportId()))
throw new NoSuchTransportException();
db.addEndpoint(txn, ep);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public boolean addGroup(Group g) throws DbException {
boolean added = false;
lock.writeLock().lock();
@@ -290,30 +271,6 @@ DatabaseCleaner.Callback {
}
}
public void addSecrets(Collection<TemporarySecret> secrets)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
Collection<TemporarySecret> relevant =
new ArrayList<TemporarySecret>();
for (TemporarySecret s : secrets) {
if (db.containsContact(txn, s.getContactId()))
if (db.containsTransport(txn, s.getTransportId()))
relevant.add(s);
}
if (!secrets.isEmpty()) db.addSecrets(txn, relevant);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public boolean addTransport(TransportId t, int maxLatency)
throws DbException {
boolean added;
@@ -334,6 +291,27 @@ DatabaseCleaner.Callback {
return added;
}
public void addTransportKeys(ContactId c, TransportKeys k)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
if (!db.containsTransport(txn, k.getTransportId()))
throw new NoSuchTransportException();
db.addTransportKeys(txn, c, k);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public Ack generateAck(ContactId c, int maxMessages) throws DbException {
Collection<MessageId> ids;
lock.writeLock().lock();
@@ -883,23 +861,6 @@ DatabaseCleaner.Callback {
}
}
public Collection<TemporarySecret> getSecrets() throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Collection<TemporarySecret> secrets = db.getSecrets(txn);
db.commitTransaction(txn);
return secrets;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Settings getSettings() throws DbException {
lock.readLock().lock();
try {
@@ -934,6 +895,27 @@ DatabaseCleaner.Callback {
}
}
public Map<ContactId, TransportKeys> getTransportKeys(TransportId t)
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsTransport(txn, t))
throw new NoSuchTransportException();
Map<ContactId, TransportKeys> keys =
db.getTransportKeys(txn, t);
db.commitTransaction(txn);
return keys;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Map<TransportId, Integer> getTransportLatencies()
throws DbException {
lock.readLock().lock();
@@ -989,8 +971,8 @@ DatabaseCleaner.Callback {
}
}
public long incrementStreamCounter(ContactId c, TransportId t,
long period) throws DbException {
public void incrementStreamCounter(ContactId c, TransportId t,
long rotationPeriod) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
@@ -999,9 +981,8 @@ DatabaseCleaner.Callback {
throw new NoSuchContactException();
if (!db.containsTransport(txn, t))
throw new NoSuchTransportException();
long counter = db.incrementStreamCounter(txn, c, t, period);
db.incrementStreamCounter(txn, c, t, rotationPeriod);
db.commitTransaction(txn);
return counter;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
@@ -1404,27 +1385,6 @@ DatabaseCleaner.Callback {
eventBus.broadcast(new TransportRemovedEvent(t));
}
public void setReorderingWindow(ContactId c, TransportId t, long period,
long centre, byte[] bitmap) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
if (!db.containsTransport(txn, t))
throw new NoSuchTransportException();
db.setReorderingWindow(txn, c, t, period, centre, bitmap);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void setInboxGroup(ContactId c, Group g) throws DbException {
lock.writeLock().lock();
try {
@@ -1480,6 +1440,27 @@ DatabaseCleaner.Callback {
}
}
public void setReorderingWindow(ContactId c, TransportId t,
long rotationPeriod, long base, byte[] bitmap) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
if (!db.containsTransport(txn, t))
throw new NoSuchTransportException();
db.setReorderingWindow(txn, c, t, rotationPeriod, base, bitmap);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
Collection<ContactId> affected = new ArrayList<ContactId>();
@@ -1552,6 +1533,33 @@ DatabaseCleaner.Callback {
eventBus.broadcast(new LocalSubscriptionsUpdatedEvent(affected));
}
public void updateTransportKeys(Map<ContactId, TransportKeys> keys)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
Map<ContactId, TransportKeys> filtered =
new HashMap<ContactId, TransportKeys>();
for (Entry<ContactId, TransportKeys> e : keys.entrySet()) {
ContactId c = e.getKey();
TransportKeys k = e.getValue();
if (db.containsContact(txn, c)
&& db.containsTransport(txn, k.getTransportId())) {
filtered.put(c, k);
}
}
db.updateTransportKeys(txn, filtered);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void checkFreeSpaceAndClean() throws DbException {
long freeSpace = db.getFreeSpace();
if (LOG.isLoggable(INFO)) LOG.info(freeSpace + " bytes free space");

View File

@@ -1,14 +1,33 @@
package org.briarproject.db;
import static java.sql.Types.BINARY;
import static java.sql.Types.VARCHAR;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.Author.Status.ANONYMOUS;
import static org.briarproject.api.Author.Status.UNKNOWN;
import static org.briarproject.api.Author.Status.VERIFIED;
import static org.briarproject.api.messaging.MessagingConstants.MAX_SUBSCRIPTIONS;
import static org.briarproject.api.messaging.MessagingConstants.RETENTION_GRANULARITY;
import static org.briarproject.db.ExponentialBackoff.calculateExpiry;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorId;
import org.briarproject.api.Contact;
import org.briarproject.api.ContactId;
import org.briarproject.api.LocalAuthor;
import org.briarproject.api.Settings;
import org.briarproject.api.TransportConfig;
import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.crypto.SecretKey;
import org.briarproject.api.db.DbClosedException;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.MessageHeader;
import org.briarproject.api.db.MessageHeader.State;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.GroupId;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageId;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.system.Clock;
import org.briarproject.api.transport.IncomingKeys;
import org.briarproject.api.transport.OutgoingKeys;
import org.briarproject.api.transport.TransportKeys;
import java.io.IOException;
import java.sql.Connection;
@@ -32,32 +51,15 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import org.briarproject.api.Author;
import org.briarproject.api.AuthorId;
import org.briarproject.api.Contact;
import org.briarproject.api.ContactId;
import org.briarproject.api.LocalAuthor;
import org.briarproject.api.Settings;
import org.briarproject.api.TransportConfig;
import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.db.DbClosedException;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.MessageHeader;
import org.briarproject.api.db.MessageHeader.State;
import org.briarproject.api.messaging.Group;
import org.briarproject.api.messaging.GroupId;
import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageId;
import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.system.Clock;
import org.briarproject.api.transport.Endpoint;
import org.briarproject.api.transport.TemporarySecret;
import static java.sql.Types.BINARY;
import static java.sql.Types.VARCHAR;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.Author.Status.ANONYMOUS;
import static org.briarproject.api.Author.Status.UNKNOWN;
import static org.briarproject.api.Author.Status.VERIFIED;
import static org.briarproject.api.messaging.MessagingConstants.MAX_SUBSCRIPTIONS;
import static org.briarproject.api.messaging.MessagingConstants.RETENTION_GRANULARITY;
import static org.briarproject.db.ExponentialBackoff.calculateExpiry;
/**
* A generic database implementation that can be used with any JDBC-compatible
@@ -65,8 +67,8 @@ import org.briarproject.api.transport.TemporarySecret;
*/
abstract class JdbcDatabase implements Database<Connection> {
private static final int SCHEMA_VERSION = 9;
private static final int MIN_SCHEMA_VERSION = 9;
private static final int SCHEMA_VERSION = 10;
private static final int MIN_SCHEMA_VERSION = 10;
private static final String CREATE_SETTINGS =
"CREATE TABLE settings"
@@ -277,13 +279,16 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_ENDPOINTS =
"CREATE TABLE endpoints"
private static final String CREATE_INCOMING_KEYS =
"CREATE TABLE incomingKeys"
+ " (contactId INT NOT NULL,"
+ " transportId VARCHAR NOT NULL,"
+ " epoch BIGINT NOT NULL,"
+ " alice BOOLEAN NOT NULL,"
+ " PRIMARY KEY (contactId, transportId),"
+ " period BIGINT NOT NULL,"
+ " tagKey SECRET NOT NULL,"
+ " headerKey SECRET NOT NULL,"
+ " base BIGINT NOT NULL,"
+ " bitmap BINARY NOT NULL,"
+ " PRIMARY KEY (contactId, transportId, period),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE,"
@@ -291,16 +296,15 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_SECRETS =
"CREATE TABLE secrets"
private static final String CREATE_OUTGOING_KEYS =
"CREATE TABLE outgoingKeys"
+ " (contactId INT NOT NULL,"
+ " transportId VARCHAR NOT NULL,"
+ " period BIGINT NOT NULL,"
+ " secret SECRET NOT NULL,"
+ " outgoing BIGINT NOT NULL,"
+ " centre BIGINT NOT NULL,"
+ " bitmap BINARY NOT NULL,"
+ " PRIMARY KEY (contactId, transportId, period),"
+ " tagKey SECRET NOT NULL,"
+ " headerKey SECRET NOT NULL,"
+ " stream BIGINT NOT NULL,"
+ " PRIMARY KEY (contactId, transportId),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE,"
@@ -324,6 +328,7 @@ abstract class JdbcDatabase implements Database<Connection> {
private boolean closed = false; // Locking: connectionsLock
protected abstract Connection createConnection() throws SQLException;
protected abstract void flushBuffersToDisk(Statement s) throws SQLException;
private final Lock connectionsLock = new ReentrantLock();
@@ -339,7 +344,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
protected void open(String driverClass, boolean reopen) throws DbException,
IOException {
IOException {
// Load the JDBC driver
try {
Class.forName(driverClass);
@@ -382,7 +387,7 @@ abstract class JdbcDatabase implements Database<Connection> {
try {
if (rs != null) rs.close();
} catch (SQLException e) {
if (LOG.isLoggable(WARNING))LOG.log(WARNING, e.toString(), e);
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
@@ -390,7 +395,7 @@ abstract class JdbcDatabase implements Database<Connection> {
try {
if (s != null) s.close();
} catch (SQLException e) {
if (LOG.isLoggable(WARNING))LOG.log(WARNING, e.toString(), e);
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
@@ -418,8 +423,8 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(insertTypeNames(CREATE_TRANSPORT_VERSIONS));
s.executeUpdate(insertTypeNames(CREATE_CONTACT_TRANSPORT_PROPS));
s.executeUpdate(insertTypeNames(CREATE_CONTACT_TRANSPORT_VERSIONS));
s.executeUpdate(insertTypeNames(CREATE_ENDPOINTS));
s.executeUpdate(insertTypeNames(CREATE_SECRETS));
s.executeUpdate(insertTypeNames(CREATE_INCOMING_KEYS));
s.executeUpdate(insertTypeNames(CREATE_OUTGOING_KEYS));
s.close();
} catch (SQLException e) {
tryToClose(s);
@@ -480,7 +485,8 @@ abstract class JdbcDatabase implements Database<Connection> {
try {
txn.close();
} catch (SQLException e1) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e1.toString(), e1);
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e1.toString(), e1);
}
// Whatever happens, allow the database to close
connectionsLock.lock();
@@ -679,26 +685,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void addEndpoint(Connection txn, Endpoint ep) throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO endpoints"
+ " (contactId, transportId, epoch, alice)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, ep.getContactId().getInt());
ps.setString(2, ep.getTransportId().getString());
ps.setLong(3, ep.getEpoch());
ps.setBoolean(4, ep.getAlice());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public boolean addGroup(Connection txn, Group g) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -824,52 +810,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void addSecrets(Connection txn, Collection<TemporarySecret> secrets)
throws DbException {
PreparedStatement ps = null;
try {
// Store the new secrets
String sql = "INSERT INTO secrets (contactId, transportId, period,"
+ " secret, outgoing, centre, bitmap)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
for (TemporarySecret s : secrets) {
ps.setInt(1, s.getContactId().getInt());
ps.setString(2, s.getTransportId().getString());
ps.setLong(3, s.getPeriod());
ps.setBytes(4, s.getSecret());
ps.setLong(5, s.getOutgoingStreamCounter());
ps.setLong(6, s.getWindowCentre());
ps.setBytes(7, s.getWindowBitmap());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != secrets.size())
throw new DbStateException();
for (int i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
// Delete any obsolete secrets
sql = "DELETE FROM secrets"
+ " WHERE contactId = ? AND transportId = ? AND period < ?";
ps = txn.prepareStatement(sql);
for (TemporarySecret s : secrets) {
ps.setInt(1, s.getContactId().getInt());
ps.setString(2, s.getTransportId().getString());
ps.setLong(3, s.getPeriod() - 2);
ps.addBatch();
}
batchAffected = ps.executeBatch();
if (batchAffected.length != secrets.size())
throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void addStatus(Connection txn, ContactId c, MessageId m, boolean ack,
boolean seen) throws DbException {
PreparedStatement ps = null;
@@ -947,6 +887,68 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void addTransportKeys(Connection txn, ContactId c, TransportKeys k)
throws DbException {
PreparedStatement ps = null;
try {
// Store the incoming keys
String sql = "INSERT INTO incomingKeys (contactId, transportId,"
+ " period, tagKey, headerKey, base, bitmap)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setString(2, k.getTransportId().getString());
// Previous rotation period
IncomingKeys inPrev = k.getPreviousIncomingKeys();
ps.setLong(3, inPrev.getRotationPeriod());
ps.setBytes(4, inPrev.getTagKey().getBytes());
ps.setBytes(5, inPrev.getHeaderKey().getBytes());
ps.setLong(6, inPrev.getWindowBase());
ps.setBytes(7, inPrev.getWindowBitmap());
ps.addBatch();
// Current rotation period
IncomingKeys inCurr = k.getCurrentIncomingKeys();
ps.setLong(3, inCurr.getRotationPeriod());
ps.setBytes(4, inCurr.getTagKey().getBytes());
ps.setBytes(5, inCurr.getHeaderKey().getBytes());
ps.setLong(6, inCurr.getWindowBase());
ps.setBytes(7, inCurr.getWindowBitmap());
ps.addBatch();
// Next rotation period
IncomingKeys inNext = k.getNextIncomingKeys();
ps.setLong(3, inNext.getRotationPeriod());
ps.setBytes(4, inNext.getTagKey().getBytes());
ps.setBytes(5, inNext.getHeaderKey().getBytes());
ps.setLong(6, inNext.getWindowBase());
ps.setBytes(7, inNext.getWindowBitmap());
ps.addBatch();
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != 3) throw new DbStateException();
for (int i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
// Store the outgoing keys
sql = "INSERT INTO outgoingKeys (contactId, transportId, period,"
+ " tagKey, headerKey, stream)"
+ " VALUES (?, ?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setString(2, k.getTransportId().getString());
OutgoingKeys outCurr = k.getCurrentOutgoingKeys();
ps.setLong(3, outCurr.getRotationPeriod());
ps.setBytes(4, outCurr.getTagKey().getBytes());
ps.setBytes(5, outCurr.getHeaderKey().getBytes());
ps.setLong(6, outCurr.getStreamCounter());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void addVisibility(Connection txn, ContactId c, GroupId g)
throws DbException {
PreparedStatement ps = null;
@@ -1326,32 +1328,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<Endpoint> getEndpoints(Connection txn)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT contactId, transportId, epoch, alice"
+ " FROM endpoints";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
List<Endpoint> endpoints = new ArrayList<Endpoint>();
while (rs.next()) {
ContactId contactId = new ContactId(rs.getInt(1));
TransportId transportId = new TransportId(rs.getString(2));
long epoch = rs.getLong(3);
boolean alice = rs.getBoolean(4);
endpoints.add(new Endpoint(contactId, transportId, epoch,
alice));
}
return Collections.unmodifiableList(endpoints);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Group getGroup(Connection txn, GroupId g) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -2098,43 +2074,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<TemporarySecret> getSecrets(Connection txn)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT e.contactId, e.transportId, epoch, alice,"
+ " period, secret, outgoing, centre, bitmap"
+ " FROM endpoints AS e"
+ " JOIN secrets AS s"
+ " ON e.contactId = s.contactId"
+ " AND e.transportId = s.transportId";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
List<TemporarySecret> secrets = new ArrayList<TemporarySecret>();
while (rs.next()) {
ContactId contactId = new ContactId(rs.getInt(1));
TransportId transportId = new TransportId(rs.getString(2));
long epoch = rs.getLong(3);
boolean alice = rs.getBoolean(4);
long period = rs.getLong(5);
byte[] secret = rs.getBytes(6);
long outgoing = rs.getLong(7);
long centre = rs.getLong(8);
byte[] bitmap = rs.getBytes(9);
secrets.add(new TemporarySecret(contactId, transportId, epoch,
alice, period, secret, outgoing, centre, bitmap));
}
rs.close();
ps.close();
return Collections.unmodifiableList(secrets);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Settings getSettings(Connection txn) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -2317,6 +2256,67 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Map<ContactId, TransportKeys> getTransportKeys(Connection txn,
TransportId t) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Retrieve the incoming keys
String sql = "SELECT period, tagKey, headerKey, base, bitmap"
+ " FROM incomingKeys"
+ " WHERE transportId = ?"
+ " ORDER BY contactId, period";
ps = txn.prepareStatement(sql);
ps.setString(1, t.getString());
rs = ps.executeQuery();
List<IncomingKeys> inKeys = new ArrayList<IncomingKeys>();
while (rs.next()) {
long rotationPeriod = rs.getLong(1);
SecretKey tagKey = new SecretKey(rs.getBytes(2));
SecretKey headerKey = new SecretKey(rs.getBytes(3));
long windowBase = rs.getLong(4);
byte[] windowBitmap = rs.getBytes(5);
inKeys.add(new IncomingKeys(tagKey, headerKey, rotationPeriod,
windowBase, windowBitmap));
}
rs.close();
ps.close();
// Retrieve the outgoing keys in the same order
sql = "SELECT contactId, period, tagKey, headerKey, stream"
+ " FROM outgoingKeys"
+ " WHERE transportId = ?"
+ " ORDER BY contactId, period";
ps = txn.prepareStatement(sql);
ps.setString(1, t.getString());
rs = ps.executeQuery();
Map<ContactId, TransportKeys> keys =
new HashMap<ContactId, TransportKeys>();
for (int i = 0; rs.next(); i++) {
// There should be three times as many incoming keys
if (inKeys.size() < (i + 1) * 3) throw new DbStateException();
ContactId contactId = new ContactId(rs.getInt(1));
long rotationPeriod = rs.getLong(2);
SecretKey tagKey = new SecretKey(rs.getBytes(3));
SecretKey headerKey = new SecretKey(rs.getBytes(4));
long streamCounter = rs.getLong(5);
OutgoingKeys outCurr = new OutgoingKeys(tagKey, headerKey,
rotationPeriod, streamCounter);
IncomingKeys inPrev = inKeys.get(i * 3);
IncomingKeys inCurr = inKeys.get(i * 3 + 1);
IncomingKeys inNext = inKeys.get(i * 3 + 2);
keys.put(contactId, new TransportKeys(t, inPrev, inCurr,
inNext, outCurr));
}
rs.close();
ps.close();
return Collections.unmodifiableMap(keys);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Map<TransportId, Integer> getTransportLatencies(Connection txn)
throws DbException {
PreparedStatement ps = null;
@@ -2327,7 +2327,7 @@ abstract class JdbcDatabase implements Database<Connection> {
rs = ps.executeQuery();
Map<TransportId, Integer> latencies =
new HashMap<TransportId, Integer>();
while (rs.next()){
while (rs.next()) {
TransportId id = new TransportId(rs.getString(1));
latencies.put(id, rs.getInt(2));
}
@@ -2392,7 +2392,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.setString(3, u.getId().getString());
ps.addBatch();
}
int [] batchAffected = ps.executeBatch();
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != updates.size())
throw new DbStateException();
for (i = 0; i < batchAffected.length; i++) {
@@ -2455,42 +2455,21 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public long incrementStreamCounter(Connection txn, ContactId c,
TransportId t, long period) throws DbException {
public void incrementStreamCounter(Connection txn, ContactId c,
TransportId t, long rotationPeriod) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Get the current stream counter
String sql = "SELECT outgoing FROM secrets"
String sql = "UPDATE outgoingKeys SET stream = stream + 1"
+ " WHERE contactId = ? AND transportId = ? AND period = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setString(2, t.getString());
ps.setLong(3, period);
rs = ps.executeQuery();
if (!rs.next()) {
rs.close();
ps.close();
return -1;
}
long streamNumber = rs.getLong(1);
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
// Increment the stream counter
sql = "UPDATE secrets SET outgoing = outgoing + 1"
+ " WHERE contactId = ? AND transportId = ? AND period = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setString(2, t.getString());
ps.setLong(3, period);
ps.setLong(3, rotationPeriod);
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
return streamNumber;
} catch (SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
@@ -2929,18 +2908,19 @@ abstract class JdbcDatabase implements Database<Connection> {
throw new DbException(e);
}
}
public void setReorderingWindow(Connection txn, ContactId c, TransportId t,
long period, long centre, byte[] bitmap) throws DbException {
long rotationPeriod, long base, byte[] bitmap) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE secrets SET centre = ?, bitmap = ?"
String sql = "UPDATE incomingKeys SET base = ?, bitmap = ?"
+ " WHERE contactId = ? AND transportId = ? AND period = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, centre);
ps.setLong(1, base);
ps.setBytes(2, bitmap);
ps.setInt(3, c.getInt());
ps.setString(4, t.getString());
ps.setLong(5, period);
ps.setLong(5, rotationPeriod);
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
@@ -3139,7 +3119,7 @@ abstract class JdbcDatabase implements Database<Connection> {
public boolean setRemoteProperties(Connection txn, ContactId c,
TransportId t, TransportProperties p, long version)
throws DbException {
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -3354,4 +3334,46 @@ abstract class JdbcDatabase implements Database<Connection> {
throw new DbException(e);
}
}
public void updateTransportKeys(Connection txn,
Map<ContactId, TransportKeys> keys) throws DbException {
PreparedStatement ps = null;
try {
// Delete any existing incoming keys
String sql = "DELETE FROM incomingKeys"
+ " WHERE contactId = ?"
+ " AND transportId = ?";
ps = txn.prepareStatement(sql);
for (Entry<ContactId, TransportKeys> e : keys.entrySet()) {
ps.setInt(1, e.getKey().getInt());
ps.setString(2, e.getValue().getTransportId().getString());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != keys.size())
throw new DbStateException();
ps.close();
// Delete any existing outgoing keys
sql = "DELETE FROM outgoingKeys"
+ " WHERE contactId = ?"
+ " AND transportId = ?";
ps = txn.prepareStatement(sql);
for (Entry<ContactId, TransportKeys> e : keys.entrySet()) {
ps.setInt(1, e.getKey().getInt());
ps.setString(2, e.getValue().getTransportId().getString());
ps.addBatch();
}
batchAffected = ps.executeBatch();
if (batchAffected.length != keys.size())
throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
// Store the new keys
for (Entry<ContactId, TransportKeys> e : keys.entrySet()) {
addTransportKeys(txn, e.getKey(), e.getValue());
}
}
}