Merge branch 'client-transactions' into 'master'

Transactions for clients

This patch moves transactions out of the database component, allowing clients to perform multiple database calls in a single transaction. This should improve efficiency and reliability, at the cost of increased boilerplate for database calls.

Operations that allow hooks, such as adding and removing contacts, pass their transactions to their hooks. This ensures the whole operation is atomic and isolated, so StorageStatus is no longer needed, hooks don't need to be idempotent, and locks can be removed from clients that were using them for isolation.

This merge request is marked WIP because it will conflict with !74.

See merge request !101
This commit is contained in:
akwizgran
2016-02-17 17:24:18 +00:00
38 changed files with 1982 additions and 2283 deletions

View File

@@ -1,6 +1,5 @@
package org.briarproject.api.contact;
import org.briarproject.api.db.StorageStatus;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorId;
@@ -9,14 +8,11 @@ public class Contact {
private final ContactId id;
private final Author author;
private final AuthorId localAuthorId;
private final StorageStatus status;
public Contact(ContactId id, Author author, AuthorId localAuthorId,
StorageStatus status) {
public Contact(ContactId id, Author author, AuthorId localAuthorId) {
this.id = id;
this.author = author;
this.localAuthorId = localAuthorId;
this.status = status;
}
public ContactId getId() {
@@ -31,10 +27,6 @@ public class Contact {
return localAuthorId;
}
public StorageStatus getStatus() {
return status;
}
@Override
public int hashCode() {
return id.hashCode();

View File

@@ -1,6 +1,7 @@
package org.briarproject.api.contact;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorId;
@@ -30,10 +31,10 @@ public interface ContactManager {
void removeContact(ContactId c) throws DbException;
interface AddContactHook {
void addingContact(ContactId c);
void addingContact(Transaction txn, Contact c) throws DbException;
}
interface RemoveContactHook {
void removingContact(ContactId c);
void removingContact(Transaction txn, Contact c) throws DbException;
}
}

View File

@@ -26,57 +26,85 @@ import java.util.Map;
/**
* Encapsulates the database implementation and exposes high-level operations
* to other components.
* <p>
* <p/>
* This interface's methods are blocking, but they do not call out into other
* components except to broadcast {@link org.briarproject.api.event.Event
* Events}, so they can safely be called while holding locks.
*/
public interface DatabaseComponent {
/** Opens the database and returns true if the database already existed. */
/**
* Opens the database and returns true if the database already existed.
*/
boolean open() throws DbException;
/** Waits for any open transactions to finish and closes the database. */
/**
* Waits for any open transactions to finish and closes the database.
*/
void close() throws DbException, IOException;
/**
* Starts a new transaction and returns an object representing it.
*/
Transaction startTransaction() throws DbException;
/**
* Ends a transaction. If the transaction is marked as complete, the
* transaction is committed and any events attached to the transaction are
* broadcast; otherwise the transaction is aborted.
*/
void endTransaction(Transaction txn) throws DbException;
/**
* Stores a contact associated with the given local and remote pseudonyms,
* and returns an ID for the contact.
*/
ContactId addContact(Author remote, AuthorId local) throws DbException;
/** Stores a group. */
void addGroup(Group g) throws DbException;
/** Stores a local pseudonym. */
void addLocalAuthor(LocalAuthor a) throws DbException;
/** Stores a local message. */
void addLocalMessage(Message m, ClientId c, Metadata meta, boolean shared)
ContactId addContact(Transaction txn, Author remote, AuthorId local)
throws DbException;
/** Stores a transport. */
void addTransport(TransportId t, int maxLatency) throws DbException;
/**
* Stores a group.
*/
void addGroup(Transaction txn, Group g) throws DbException;
/**
* Stores a local pseudonym.
*/
void addLocalAuthor(Transaction txn, LocalAuthor a) throws DbException;
/**
* Stores a local message.
*/
void addLocalMessage(Transaction txn, Message m, ClientId c, Metadata meta,
boolean shared) throws DbException;
/**
* Stores a transport.
*/
void addTransport(Transaction txn, TransportId t, int maxLatency)
throws DbException;
/**
* Stores transport keys for a newly added contact.
*/
void addTransportKeys(ContactId c, TransportKeys k) throws DbException;
void addTransportKeys(Transaction txn, ContactId c, TransportKeys k)
throws DbException;
/**
* Deletes the message with the given ID. The message ID and any other
* associated data are not deleted.
*/
void deleteMessage(MessageId m) throws DbException;
void deleteMessage(Transaction txn, MessageId m) throws DbException;
/** Deletes any metadata associated with the given message. */
void deleteMessageMetadata(MessageId m) throws DbException;
void deleteMessageMetadata(Transaction txn, MessageId m) throws DbException;
/**
* Returns an acknowledgement for the given contact, or null if there are
* no messages to acknowledge.
*/
Ack generateAck(ContactId c, int maxMessages) throws DbException;
Ack generateAck(Transaction txn, ContactId c, int maxMessages)
throws DbException;
/**
* Returns a batch of raw messages for the given contact, with a total
@@ -84,22 +112,23 @@ public interface DatabaseComponent {
* transport with the given maximum latency. Returns null if there are no
* sendable messages that fit in the given length.
*/
Collection<byte[]> generateBatch(ContactId c, int maxLength,
int maxLatency) throws DbException;
Collection<byte[]> generateBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
/**
* Returns an offer for the given contact for transmission over a
* transport with the given maximum latency, or null if there are no
* messages to offer.
*/
Offer generateOffer(ContactId c, int maxMessages, int maxLatency)
throws DbException;
Offer generateOffer(Transaction txn, ContactId c, int maxMessages,
int maxLatency) throws DbException;
/**
* Returns a request for the given contact, or null if there are no
* messages to request.
*/
Request generateRequest(ContactId c, int maxMessages) throws DbException;
Request generateRequest(Transaction txn, ContactId c, int maxMessages)
throws DbException;
/**
* Returns a batch of raw messages for the given contact, with a total
@@ -108,168 +137,214 @@ public interface DatabaseComponent {
* requested by the contact are returned. Returns null if there are no
* sendable messages that fit in the given length.
*/
Collection<byte[]> generateRequestedBatch(ContactId c, int maxLength,
int maxLatency) throws DbException;
Collection<byte[]> generateRequestedBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
/** Returns the contact with the given ID. */
Contact getContact(ContactId c) throws DbException;
/**
* Returns the contact with the given ID.
*/
Contact getContact(Transaction txn, ContactId c) throws DbException;
/** Returns all contacts. */
Collection<Contact> getContacts() throws DbException;
/**
* Returns all contacts.
*/
Collection<Contact> getContacts(Transaction txn) throws DbException;
/** Returns all contacts associated with the given local pseudonym. */
Collection<ContactId> getContacts(AuthorId a) throws DbException;
/**
* Returns all contacts associated with the given local pseudonym.
*/
Collection<ContactId> getContacts(Transaction txn, AuthorId a)
throws DbException;
/** Returns the unique ID for this device. */
DeviceId getDeviceId() throws DbException;
/**
* Returns the unique ID for this device.
*/
DeviceId getDeviceId(Transaction txn) throws DbException;
/** Returns the group with the given ID. */
Group getGroup(GroupId g) throws DbException;
/**
* Returns the group with the given ID.
*/
Group getGroup(Transaction txn, GroupId g) throws DbException;
/** Returns the metadata for the given group. */
Metadata getGroupMetadata(GroupId g) throws DbException;
/**
* Returns the metadata for the given group.
*/
Metadata getGroupMetadata(Transaction txn, GroupId g) throws DbException;
/** Returns all groups belonging to the given client. */
Collection<Group> getGroups(ClientId c) throws DbException;
/**
* Returns all groups belonging to the given client.
*/
Collection<Group> getGroups(Transaction txn, ClientId c) throws DbException;
/** Returns the local pseudonym with the given ID. */
LocalAuthor getLocalAuthor(AuthorId a) throws DbException;
/**
* Returns the local pseudonym with the given ID.
*/
LocalAuthor getLocalAuthor(Transaction txn, AuthorId a) throws DbException;
/** Returns all local pseudonyms. */
Collection<LocalAuthor> getLocalAuthors() throws DbException;
/**
* Returns all local pseudonyms.
*/
Collection<LocalAuthor> getLocalAuthors(Transaction txn) throws DbException;
/**
* Returns the IDs of any messages that need to be validated by the given
* client.
*/
Collection<MessageId> getMessagesToValidate(ClientId c) throws DbException;
/** Returns the message with the given ID, in serialised form. */
byte[] getRawMessage(MessageId m) throws DbException;
/** Returns the metadata for all messages in the given group. */
Map<MessageId, Metadata> getMessageMetadata(GroupId g)
Collection<MessageId> getMessagesToValidate(Transaction txn, ClientId c)
throws DbException;
/** Returns the metadata for the given message. */
Metadata getMessageMetadata(MessageId m) throws DbException;
/**
* Returns the message with the given ID, in serialised form.
*/
byte[] getRawMessage(Transaction txn, MessageId m) throws DbException;
/**
* Returns the metadata for all messages in the given group.
*/
Map<MessageId, Metadata> getMessageMetadata(Transaction txn, GroupId g)
throws DbException;
/**
* Returns the metadata for the given message.
*/
Metadata getMessageMetadata(Transaction txn, MessageId m)
throws DbException;
/**
* Returns the status of all messages in the given group with respect to
* the given contact.
*/
Collection<MessageStatus> getMessageStatus(ContactId c, GroupId g)
throws DbException;
Collection<MessageStatus> getMessageStatus(Transaction txn, ContactId c,
GroupId g) throws DbException;
/**
* Returns the status of the given message with respect to the given
* contact.
*/
MessageStatus getMessageStatus(ContactId c, MessageId m)
MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m)
throws DbException;
/** Returns all settings in the given namespace. */
Settings getSettings(String namespace) throws DbException;
/**
* Returns all settings in the given namespace.
*/
Settings getSettings(Transaction txn, String namespace) throws DbException;
/** Returns all transport keys for the given transport. */
Map<ContactId, TransportKeys> getTransportKeys(TransportId t)
/**
* Returns all transport keys for the given transport.
*/
Map<ContactId, TransportKeys> getTransportKeys(Transaction txn,
TransportId t) throws DbException;
/**
* Returns the maximum latencies in milliseconds of all transports.
*/
Map<TransportId, Integer> getTransportLatencies(Transaction txn)
throws DbException;
/** Returns the maximum latencies in milliseconds of all transports. */
Map<TransportId, Integer> getTransportLatencies() throws DbException;
/** Returns the IDs of all contacts to which the given group is visible. */
Collection<ContactId> getVisibility(GroupId g) throws DbException;
/**
* Increments the outgoing stream counter for the given contact and
* transport in the given rotation period .
*/
void incrementStreamCounter(ContactId c, TransportId t, long rotationPeriod)
throws DbException;
void incrementStreamCounter(Transaction txn, ContactId c, TransportId t,
long rotationPeriod) throws DbException;
/** Returns true if the given group is visible to the given contact. */
boolean isVisibleToContact(ContactId c, GroupId g) throws DbException;
/**
* Returns true if the given group is visible to the given contact.
*/
boolean isVisibleToContact(Transaction txn, ContactId c, GroupId g)
throws DbException;
/**
* Merges the given metadata with the existing metadata for the given
* group.
*/
void mergeGroupMetadata(GroupId g, Metadata meta) throws DbException;
void mergeGroupMetadata(Transaction txn, GroupId g, Metadata meta)
throws DbException;
/**
* Merges the given metadata with the existing metadata for the given
* message.
*/
void mergeMessageMetadata(MessageId m, Metadata meta) throws DbException;
void mergeMessageMetadata(Transaction txn, MessageId m, Metadata meta)
throws DbException;
/**
* Merges the given settings with the existing settings in the given
* namespace.
*/
void mergeSettings(Settings s, String namespace) throws DbException;
void mergeSettings(Transaction txn, Settings s, String namespace)
throws DbException;
/** Processes an ack from the given contact. */
void receiveAck(ContactId c, Ack a) throws DbException;
/**
* Processes an ack from the given contact.
*/
void receiveAck(Transaction txn, ContactId c, Ack a) throws DbException;
/** Processes a message from the given contact. */
void receiveMessage(ContactId c, Message m) throws DbException;
/**
* Processes a message from the given contact.
*/
void receiveMessage(Transaction txn, ContactId c, Message m)
throws DbException;
/** Processes an offer from the given contact. */
void receiveOffer(ContactId c, Offer o) throws DbException;
/**
* Processes an offer from the given contact.
*/
void receiveOffer(Transaction txn, ContactId c, Offer o) throws DbException;
/** Processes a request from the given contact. */
void receiveRequest(ContactId c, Request r) throws DbException;
/**
* Processes a request from the given contact.
*/
void receiveRequest(Transaction txn, ContactId c, Request r)
throws DbException;
/** Removes a contact (and all associated state) from the database. */
void removeContact(ContactId c) throws DbException;
/**
* Removes a contact (and all associated state) from the database.
*/
void removeContact(Transaction txn, ContactId c) throws DbException;
/** Removes a group (and all associated state) from the database. */
void removeGroup(Group g) throws DbException;
/**
* Removes a group (and all associated state) from the database.
*/
void removeGroup(Transaction txn, Group g) throws DbException;
/**
* Removes a local pseudonym (and all associated state) from the database.
*/
void removeLocalAuthor(AuthorId a) throws DbException;
void removeLocalAuthor(Transaction txn, AuthorId a) throws DbException;
/** Removes a transport (and all associated state) from the database. */
void removeTransport(TransportId t) throws DbException;
/**
* Removes a transport (and all associated state) from the database.
*/
void removeTransport(Transaction txn, TransportId t) throws DbException;
/** Sets the status of the given contact. */
void setContactStatus(ContactId c, StorageStatus s) throws DbException;
/**
* Marks the given message as shared or unshared.
*/
void setMessageShared(Transaction txn, Message m, boolean shared)
throws DbException;
/** Sets the status of the given local pseudonym. */
void setLocalAuthorStatus(AuthorId a, StorageStatus s)
throws DbException;
/** Marks the given message as shared or unshared. */
void setMessageShared(Message m, boolean shared) throws DbException;
/** Marks the given message as valid or invalid. */
void setMessageValid(Message m, ClientId c, boolean valid)
/**
* Marks the given message as valid or invalid.
*/
void setMessageValid(Transaction txn, Message m, ClientId c, boolean valid)
throws DbException;
/**
* Sets the reordering window for the given contact and transport in the
* given rotation period.
*/
void setReorderingWindow(ContactId c, TransportId t, long rotationPeriod,
long base, byte[] bitmap) throws DbException;
void setReorderingWindow(Transaction txn, ContactId c, TransportId t,
long rotationPeriod, long base, byte[] bitmap) throws DbException;
/**
* Makes a group visible to the given set of contacts and invisible to any
* other contacts.
* Makes a group visible or invisible to a contact.
*/
void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException;
/** Makes a group visible or invisible to a contact. */
void setVisibleToContact(ContactId c, GroupId g, boolean visible)
throws DbException;
void setVisibleToContact(Transaction txn, ContactId c, GroupId g,
boolean visible) throws DbException;
/**
* Stores the given transport keys, deleting any keys they have replaced.
*/
void updateTransportKeys(Map<ContactId, TransportKeys> keys)
throws DbException;
void updateTransportKeys(Transaction txn,
Map<ContactId, TransportKeys> keys) throws DbException;
}

View File

@@ -1,21 +0,0 @@
package org.briarproject.api.db;
public enum StorageStatus {
ADDING(0), ACTIVE(1), REMOVING(2);
private final int value;
StorageStatus(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public static StorageStatus fromValue(int value) {
for (StorageStatus s : values()) if (s.value == value) return s;
throw new IllegalArgumentException();
}
}

View File

@@ -0,0 +1,63 @@
package org.briarproject.api.db;
import org.briarproject.api.event.Event;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* A wrapper around a database transaction. Transactions are not thread-safe.
*/
public class Transaction {
private final Object txn;
private List<Event> events = null;
private boolean complete = false;
public Transaction(Object txn) {
this.txn = txn;
}
/**
* Returns the database transaction. The type of the returned object
* depends on the database implementation.
*/
public Object unbox() {
return txn;
}
/**
* Attaches an event to be broadcast when the transaction has been
* committed.
*/
public void attach(Event e) {
if (events == null) events = new ArrayList<Event>();
events.add(e);
}
/**
* Returns any events attached to the transaction.
*/
public List<Event> getEvents() {
if (events == null) return Collections.emptyList();
return events;
}
/**
* Returns true if the transaction is ready to be committed.
*/
public boolean isComplete() {
return complete;
}
/**
* Marks the transaction as ready to be committed. This method must not be
* called more than once.
*/
public void setComplete() {
if (complete) throw new IllegalStateException();
complete = true;
}
}

View File

@@ -1,6 +1,7 @@
package org.briarproject.api.identity;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import java.util.Collection;
@@ -25,10 +26,11 @@ public interface IdentityManager {
void removeLocalAuthor(AuthorId a) throws DbException;
interface AddIdentityHook {
void addingIdentity(AuthorId a);
void addingIdentity(Transaction txn, LocalAuthor a) throws DbException;
}
interface RemoveIdentityHook {
void removingIdentity(AuthorId a);
void removingIdentity(Transaction txn, LocalAuthor a)
throws DbException;
}
}

View File

@@ -1,20 +1,16 @@
package org.briarproject.api.identity;
import org.briarproject.api.db.StorageStatus;
/** A pseudonym for the local user. */
public class LocalAuthor extends Author {
private final byte[] privateKey;
private final long created;
private final StorageStatus status;
public LocalAuthor(AuthorId id, String name, byte[] publicKey,
byte[] privateKey, long created, StorageStatus status) {
byte[] privateKey, long created) {
super(id, name, publicKey);
this.privateKey = privateKey;
this.created = created;
this.status = status;
}
/** Returns the private key used to generate the pseudonym's signatures. */
@@ -29,9 +25,4 @@ public class LocalAuthor extends Author {
public long getTimeCreated() {
return created;
}
/** Returns the status of the pseudonym. */
public StorageStatus getStatus() {
return status;
}
}

View File

@@ -1,6 +1,8 @@
package org.briarproject.api.sync;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.Transaction;
/**
* Responsible for managing message validators and passing them messages to
@@ -35,6 +37,7 @@ public interface ValidationManager {
void registerValidationHook(ValidationHook hook);
interface ValidationHook {
void validatingMessage(Message m, ClientId c, Metadata meta);
void validatingMessage(Transaction txn, Message m, ClientId c,
Metadata meta) throws DbException;
}
}

View File

@@ -7,75 +7,29 @@ import org.briarproject.api.contact.ContactId;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.NoSuchContactException;
import org.briarproject.api.event.ContactAddedEvent;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorId;
import org.briarproject.api.identity.IdentityManager.RemoveIdentityHook;
import org.briarproject.api.lifecycle.Service;
import org.briarproject.api.identity.LocalAuthor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.db.StorageStatus.ACTIVE;
import static org.briarproject.api.db.StorageStatus.ADDING;
import static org.briarproject.api.db.StorageStatus.REMOVING;
class ContactManagerImpl implements ContactManager, Service,
RemoveIdentityHook {
private static final Logger LOG =
Logger.getLogger(ContactManagerImpl.class.getName());
class ContactManagerImpl implements ContactManager, RemoveIdentityHook {
private final DatabaseComponent db;
private final EventBus eventBus;
private final List<AddContactHook> addHooks;
private final List<RemoveContactHook> removeHooks;
@Inject
ContactManagerImpl(DatabaseComponent db, EventBus eventBus) {
ContactManagerImpl(DatabaseComponent db) {
this.db = db;
this.eventBus = eventBus;
addHooks = new CopyOnWriteArrayList<AddContactHook>();
removeHooks = new CopyOnWriteArrayList<RemoveContactHook>();
}
@Override
public boolean start() {
// Finish adding/removing any partly added/removed contacts
try {
for (Contact c : db.getContacts()) {
if (c.getStatus().equals(ADDING)) {
for (AddContactHook hook : addHooks)
hook.addingContact(c.getId());
db.setContactStatus(c.getId(), ACTIVE);
eventBus.broadcast(new ContactAddedEvent(c.getId()));
} else if (c.getStatus().equals(REMOVING)) {
for (RemoveContactHook hook : removeHooks)
hook.removingContact(c.getId());
db.removeContact(c.getId());
eventBus.broadcast(new ContactRemovedEvent(c.getId()));
}
}
return true;
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return false;
}
}
@Override
public boolean stop() {
return true;
}
@Override
public void registerAddContactHook(AddContactHook hook) {
addHooks.add(hook);
@@ -89,45 +43,70 @@ class ContactManagerImpl implements ContactManager, Service,
@Override
public ContactId addContact(Author remote, AuthorId local)
throws DbException {
ContactId c = db.addContact(remote, local);
for (AddContactHook hook : addHooks) hook.addingContact(c);
db.setContactStatus(c, ACTIVE);
eventBus.broadcast(new ContactAddedEvent(c));
ContactId c;
Transaction txn = db.startTransaction();
try {
c = db.addContact(txn, remote, local);
Contact contact = db.getContact(txn, c);
for (AddContactHook hook : addHooks)
hook.addingContact(txn, contact);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return c;
}
@Override
public Contact getContact(ContactId c) throws DbException {
Contact contact = db.getContact(c);
if (contact.getStatus().equals(ACTIVE)) return contact;
throw new NoSuchContactException();
Contact contact;
Transaction txn = db.startTransaction();
try {
contact = db.getContact(txn, c);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return contact;
}
@Override
public Collection<Contact> getContacts() throws DbException {
Collection<Contact> contacts = db.getContacts();
// Filter out any contacts that are being added or removed
List<Contact> active = new ArrayList<Contact>(contacts.size());
for (Contact c : contacts)
if (c.getStatus().equals(ACTIVE)) active.add(c);
return Collections.unmodifiableList(active);
Collection<Contact> contacts;
Transaction txn = db.startTransaction();
try {
contacts = db.getContacts(txn);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return contacts;
}
@Override
public void removeContact(ContactId c) throws DbException {
db.setContactStatus(c, REMOVING);
for (RemoveContactHook hook : removeHooks) hook.removingContact(c);
db.removeContact(c);
eventBus.broadcast(new ContactRemovedEvent(c));
Transaction txn = db.startTransaction();
try {
removeContact(txn, c);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
}
private void removeContact(Transaction txn, ContactId c)
throws DbException {
Contact contact = db.getContact(txn, c);
for (RemoveContactHook hook : removeHooks)
hook.removingContact(txn, contact);
db.removeContact(txn, c);
}
@Override
public void removingIdentity(AuthorId a) {
public void removingIdentity(Transaction txn, LocalAuthor a)
throws DbException {
// Remove any contacts of the local pseudonym that's being removed
try {
for (ContactId c : db.getContacts(a)) removeContact(c);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
for (ContactId c : db.getContacts(txn, a.getId()))
removeContact(txn, c);
}
}

View File

@@ -5,7 +5,6 @@ import com.google.inject.Provides;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.identity.IdentityManager;
import org.briarproject.api.lifecycle.LifecycleManager;
import javax.inject.Singleton;
@@ -15,10 +14,8 @@ public class ContactModule extends AbstractModule {
protected void configure() {}
@Provides @Singleton
ContactManager getContactManager(LifecycleManager lifecycleManager,
IdentityManager identityManager,
ContactManager getContactManager(IdentityManager identityManager,
ContactManagerImpl contactManager) {
lifecycleManager.register(contactManager);
identityManager.registerRemoveIdentityHook(contactManager);
return contactManager;
}

View File

@@ -6,7 +6,6 @@ import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.StorageStatus;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorId;
import org.briarproject.api.identity.LocalAuthor;
@@ -30,27 +29,23 @@ import java.util.Map;
* obtained by calling {@link #startTransaction()}. Every transaction must be
* terminated by calling either {@link #abortTransaction(T)} or
* {@link #commitTransaction(T)}, even if an exception is thrown.
* <p>
* Read-write locking is provided by the DatabaseComponent implementation.
*/
interface Database<T> {
/**
* Opens the database and returns true if the database already existed.
* <p>
* Locking: write.
*/
boolean open() throws DbException;
/**
* Prevents new transactions from starting, waits for all current
* transactions to finish, and closes the database.
* <p>
* Locking: write.
*/
void close() throws DbException, IOException;
/** Starts a new transaction and returns an object representing it. */
/**
* Starts a new transaction and returns an object representing it.
*/
T startTransaction() throws DbException;
/**
@@ -65,59 +60,39 @@ interface Database<T> {
*/
void commitTransaction(T txn) throws DbException;
/**
* Returns the number of transactions started since the transaction count
* was last reset.
*/
int getTransactionCount();
/** Resets the transaction count. */
void resetTransactionCount();
/**
* Stores a contact associated with the given local and remote pseudonyms,
* and returns an ID for the contact.
* <p>
* Locking: write.
*/
ContactId addContact(T txn, Author remote, AuthorId local)
throws DbException;
/**
* Stores a group.
* <p>
* Locking: write.
*/
void addGroup(T txn, Group g) throws DbException;
/**
* Stores a local pseudonym.
* <p>
* Locking: write.
*/
void addLocalAuthor(T txn, LocalAuthor a) throws DbException;
/**
* Stores a message.
* <p>
* Locking: write.
*/
void addMessage(T txn, Message m, Validity validity, boolean shared)
throws DbException;
/**
* Records that a message has been offered by the given contact.
* <p>
* Locking: write.
*/
void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException;
/**
* Initialises the status of the given message with respect to the given
* contact.
* <p>
* Locking: write.
* @param ack whether the message needs to be acknowledged.
*
* @param ack whether the message needs to be acknowledged.
* @param seen whether the contact has seen the message.
*/
void addStatus(T txn, ContactId c, MessageId m, boolean ack, boolean seen)
@@ -125,76 +100,56 @@ interface Database<T> {
/**
* Stores a transport.
* <p>
* Locking: write.
*/
void addTransport(T txn, TransportId t, int maxLatency)
throws DbException;
/**
* Stores transport keys for a newly added contact.
* <p>
* Locking: write.
*/
void addTransportKeys(T txn, ContactId c, TransportKeys k)
throws DbException;
/**
* Makes a group visible to the given contact.
* <p>
* Locking: write.
*/
void addVisibility(T txn, ContactId c, GroupId g) throws DbException;
/**
* Returns true if the database contains the given contact for the given
* local pseudonym.
* <p>
* Locking: read.
*/
boolean containsContact(T txn, AuthorId remote, AuthorId local)
throws DbException;
/**
* Returns true if the database contains the given contact.
* <p>
* Locking: read.
*/
boolean containsContact(T txn, ContactId c) throws DbException;
/**
* Returns true if the database contains the given group.
* <p>
* Locking: read.
*/
boolean containsGroup(T txn, GroupId g) throws DbException;
/**
* Returns true if the database contains the given local pseudonym.
* <p>
* Locking: read.
*/
boolean containsLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Returns true if the database contains the given message.
* <p>
* Locking: read.
*/
boolean containsMessage(T txn, MessageId m) throws DbException;
/**
* Returns true if the database contains the given transport.
* <p>
* Locking: read.
*/
boolean containsTransport(T txn, TransportId t) throws DbException;
/**
* Returns true if the database contains the given group and the group is
* visible to the given contact.
* <p>
* Locking: read.
*/
boolean containsVisibleGroup(T txn, ContactId c, GroupId g)
throws DbException;
@@ -202,16 +157,12 @@ interface Database<T> {
/**
* Returns true if the database contains the given message and the message
* is visible to the given contact.
* <p>
* Locking: read.
*/
boolean containsVisibleMessage(T txn, ContactId c, MessageId m)
throws DbException;
/**
* Returns the number of messages offered by the given contact.
* <p>
* Locking: read.
*/
int countOfferedMessages(T txn, ContactId c) throws DbException;
@@ -234,36 +185,26 @@ interface Database<T> {
/**
* Returns the contact with the given ID.
* <p>
* Locking: read.
*/
Contact getContact(T txn, ContactId c) throws DbException;
/**
* Returns the IDs of all contacts.
* <p>
* Locking: read.
*/
Collection<ContactId> getContactIds(T txn) throws DbException;
/**
* Returns all contacts.
* <p>
* Locking: read.
*/
Collection<Contact> getContacts(T txn) throws DbException;
/**
* Returns all contacts associated with the given local pseudonym.
* <p>
* Locking: read.
*/
Collection<ContactId> getContacts(T txn, AuthorId a) throws DbException;
/**
* Returns the unique ID for this device.
* <p>
* Locking: read.
*/
DeviceId getDeviceId(T txn) throws DbException;
@@ -276,59 +217,43 @@ interface Database<T> {
/**
* Returns the group with the given ID.
* <p>
* Locking: read.
*/
Group getGroup(T txn, GroupId g) throws DbException;
/**
* Returns the metadata for the given group.
* <p>
* Locking: read.
*/
Metadata getGroupMetadata(T txn, GroupId g) throws DbException;
/**
* Returns all groups belonging to the given client.
* <p>
* Locking: read.
*/
Collection<Group> getGroups(T txn, ClientId c) throws DbException;
/**
* Returns the local pseudonym with the given ID.
* <p>
* Locking: read.
*/
LocalAuthor getLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Returns all local pseudonyms.
* <p>
* Locking: read.
*/
Collection<LocalAuthor> getLocalAuthors(T txn) throws DbException;
/**
* Returns the metadata for all messages in the given group.
* <p>
* Locking: read.
*/
Map<MessageId, Metadata> getMessageMetadata(T txn, GroupId g)
throws DbException;
/**
* Returns the metadata for the given message.
* <p>
* Locking: read.
*/
Metadata getMessageMetadata(T txn, MessageId m) throws DbException;
/**
* Returns the status of all messages in the given group with respect
* to the given contact.
* <p>
* Locking: read
*/
Collection<MessageStatus> getMessageStatus(T txn, ContactId c, GroupId g)
throws DbException;
@@ -336,8 +261,6 @@ interface Database<T> {
/**
* Returns the status of the given message with respect to the given
* contact.
* <p>
* Locking: read
*/
MessageStatus getMessageStatus(T txn, ContactId c, MessageId m)
throws DbException;
@@ -345,8 +268,6 @@ interface Database<T> {
/**
* Returns the IDs of some messages received from the given contact that
* need to be acknowledged, up to the given number of messages.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToAck(T txn, ContactId c, int maxMessages)
throws DbException;
@@ -354,8 +275,6 @@ interface Database<T> {
/**
* Returns the IDs of some messages that are eligible to be offered to the
* given contact, up to the given number of messages.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
int maxMessages) throws DbException;
@@ -363,8 +282,6 @@ interface Database<T> {
/**
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given total length.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength)
throws DbException;
@@ -372,8 +289,6 @@ interface Database<T> {
/**
* Returns the IDs of some messages that are eligible to be requested from
* the given contact, up to the given number of messages.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToRequest(T txn, ContactId c,
int maxMessages) throws DbException;
@@ -381,16 +296,12 @@ interface Database<T> {
/**
* Returns the IDs of any messages that need to be validated by the given
* client.
* <p>
* Locking: read.
*/
Collection<MessageId> getMessagesToValidate(T txn, ClientId c)
throws DbException;
/**
* Returns the message with the given ID, in serialised form.
* <p>
* Locking: read.
*/
byte[] getRawMessage(T txn, MessageId m) throws DbException;
@@ -398,46 +309,34 @@ interface Database<T> {
* Returns the IDs of some messages that are eligible to be sent to the
* given contact and have been requested by the contact, up to the given
* total length.
* <p>
* Locking: read.
*/
Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c,
int maxLength) throws DbException;
/**
* Returns all settings in the given namespace.
* <p>
* Locking: read.
*/
Settings getSettings(T txn, String namespace) throws DbException;
/**
* Returns all transport keys for the given transport.
* <p>
* Locking: read.
*/
Map<ContactId, TransportKeys> getTransportKeys(T txn, TransportId t)
throws DbException;
/**
* Returns the maximum latencies in milliseconds of all transports.
* <p>
* Locking: read.
*/
Map<TransportId, Integer> getTransportLatencies(T txn) throws DbException;
/**
* Returns the IDs of all contacts to which the given group is visible.
* <p>
* Locking: read.
*/
Collection<ContactId> getVisibility(T txn, GroupId g) throws DbException;
/**
* Increments the outgoing stream counter for the given contact and
* transport in the given rotation period.
* <p>
* Locking: write.
*/
void incrementStreamCounter(T txn, ContactId c, TransportId t,
long rotationPeriod) throws DbException;
@@ -445,8 +344,6 @@ interface Database<T> {
/**
* Marks the given messages as not needing to be acknowledged to the
* given contact.
* <p>
* Locking: write.
*/
void lowerAckFlag(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
@@ -454,8 +351,6 @@ interface Database<T> {
/**
* Marks the given messages as not having been requested by the given
* contact.
* <p>
* Locking: write.
*/
void lowerRequestedFlag(T txn, ContactId c, Collection<MessageId> requested)
throws DbException;
@@ -463,8 +358,6 @@ interface Database<T> {
/*
* Merges the given metadata with the existing metadata for the given
* group.
* <p>
* Locking: write.
*/
void mergeGroupMetadata(T txn, GroupId g, Metadata meta)
throws DbException;
@@ -472,8 +365,6 @@ interface Database<T> {
/*
* Merges the given metadata with the existing metadata for the given
* message.
* <p>
* Locking: write.
*/
void mergeMessageMetadata(T txn, MessageId m, Metadata meta)
throws DbException;
@@ -481,65 +372,47 @@ interface Database<T> {
/**
* Merges the given settings with the existing settings in the given
* namespace.
* <p>
* Locking: write.
*/
void mergeSettings(T txn, Settings s, String namespace) throws DbException;
/**
* Marks a message as needing to be acknowledged to the given contact.
* <p>
* Locking: write.
*/
void raiseAckFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks a message as having been requested by the given contact.
* <p>
* Locking: write.
*/
void raiseRequestedFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks a message as having been seen by the given contact.
* <p>
* Locking: write.
*/
void raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException;
/**
* Removes a contact from the database.
* <p>
* Locking: write.
*/
void removeContact(T txn, ContactId c) throws DbException;
/**
* Removes a group (and all associated state) from the database.
* <p>
* Locking: write.
*/
void removeGroup(T txn, GroupId g) throws DbException;
/**
* Removes a local pseudonym (and all associated state) from the database.
* <p>
* Locking: write.
*/
void removeLocalAuthor(T txn, AuthorId a) throws DbException;
/**
* Removes a message (and all associated state) from the database.
* <p>
* Locking: write.
*/
void removeMessage(T txn, MessageId m) throws DbException;
/**
* Removes an offered message that was offered by the given contact, or
* returns false if there is no such message.
* <p>
* Locking: write.
*/
boolean removeOfferedMessage(T txn, ContactId c, MessageId m)
throws DbException;
@@ -547,70 +420,40 @@ interface Database<T> {
/**
* Removes the given offered messages that were offered by the given
* contact.
* <p>
* Locking: write.
*/
void removeOfferedMessages(T txn, ContactId c,
Collection<MessageId> requested) throws DbException;
/**
* Removes a transport (and all associated state) from the database.
* <p>
* Locking: write.
*/
void removeTransport(T txn, TransportId t) throws DbException;
/**
* Makes a group invisible to the given contact.
* <p>
* Locking: write.
*/
void removeVisibility(T txn, ContactId c, GroupId g) throws DbException;
/**
* Resets the transmission count and expiry time of the given message with
* respect to the given contact.
* <p>
* Locking: write.
*/
void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException;
/**
* Sets the status of the given contact.
* <p>
* Locking: write.
*/
void setContactStatus(T txn, ContactId c, StorageStatus s)
throws DbException;
/**
* Sets the status of the given local pseudonym.
* <p>
* Locking: write.
*/
void setLocalAuthorStatus(T txn, AuthorId a, StorageStatus s)
throws DbException;
/**
* Marks the given message as shared or unshared.
* <p>
* Locking: write.
*/
void setMessageShared(T txn, MessageId m, boolean shared)
throws DbException;
/**
* Marks the given message as valid or invalid.
* <p>
* Locking: write.
*/
void setMessageValid(T txn, MessageId m, boolean valid) throws DbException;
/**
* Sets the reordering window for the given contact and transport in the
* given rotation period.
* <p>
* Locking: write.
*/
void setReorderingWindow(T txn, ContactId c, TransportId t,
long rotationPeriod, long base, byte[] bitmap) throws DbException;
@@ -619,16 +462,12 @@ interface Database<T> {
* Updates the transmission count and expiry time of the given message
* with respect to the given contact, using the latency of the transport
* over which it was sent.
* <p>
* Locking: write.
*/
void updateExpiryTime(T txn, ContactId c, MessageId m, int maxLatency)
throws DbException;
/**
* Stores the given transport keys, deleting any keys they have replaced.
* <p>
* Locking: write.
*/
void updateTransportKeys(T txn, Map<ContactId, TransportKeys> keys)
throws DbException;

File diff suppressed because it is too large Load Diff

View File

@@ -51,7 +51,8 @@ public class DatabaseModule extends AbstractModule {
@Provides @Singleton
DatabaseComponent getDatabaseComponent(Database<Connection> db,
EventBus eventBus, ShutdownManager shutdown) {
return new DatabaseComponentImpl<Connection>(db, eventBus, shutdown);
return new DatabaseComponentImpl<Connection>(db, Connection.class,
eventBus, shutdown);
}
@Provides @Singleton @DatabaseExecutor

View File

@@ -9,7 +9,6 @@ import org.briarproject.api.crypto.SecretKey;
import org.briarproject.api.db.DbClosedException;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.StorageStatus;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorId;
import org.briarproject.api.identity.LocalAuthor;
@@ -41,7 +40,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -49,7 +47,6 @@ import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.db.Metadata.REMOVE;
import static org.briarproject.api.db.StorageStatus.ADDING;
import static org.briarproject.api.sync.ValidationManager.Validity.INVALID;
import static org.briarproject.api.sync.ValidationManager.Validity.UNKNOWN;
import static org.briarproject.api.sync.ValidationManager.Validity.VALID;
@@ -66,8 +63,8 @@ import static org.briarproject.db.ExponentialBackoff.calculateExpiry;
*/
abstract class JdbcDatabase implements Database<Connection> {
private static final int SCHEMA_VERSION = 20;
private static final int MIN_SCHEMA_VERSION = 20;
private static final int SCHEMA_VERSION = 21;
private static final int MIN_SCHEMA_VERSION = 21;
private static final String CREATE_SETTINGS =
"CREATE TABLE settings"
@@ -83,7 +80,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " publicKey BINARY NOT NULL,"
+ " privateKey BINARY NOT NULL,"
+ " created BIGINT NOT NULL,"
+ " status INT NOT NULL,"
+ " PRIMARY KEY (authorId))";
private static final String CREATE_CONTACTS =
@@ -93,7 +89,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " name VARCHAR NOT NULL,"
+ " publicKey BINARY NOT NULL,"
+ " localAuthorId HASH NOT NULL,"
+ " status INT NOT NULL,"
+ " PRIMARY KEY (contactId),"
+ " FOREIGN KEY (localAuthorId)"
+ " REFERENCES localAuthors (authorId)"
@@ -228,8 +223,6 @@ abstract class JdbcDatabase implements Database<Connection> {
private final LinkedList<Connection> connections =
new LinkedList<Connection>(); // Locking: connectionsLock
private final AtomicInteger transactionCount = new AtomicInteger(0);
private int openConnections = 0; // Locking: connectionsLock
private boolean closed = false; // Locking: connectionsLock
@@ -369,7 +362,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} catch (SQLException e) {
throw new DbException(e);
}
transactionCount.incrementAndGet();
return txn;
}
@@ -418,14 +410,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public int getTransactionCount() {
return transactionCount.get();
}
public void resetTransactionCount() {
transactionCount.set(0);
}
protected void closeAllConnections() throws SQLException {
boolean interrupted = false;
connectionsLock.lock();
@@ -464,14 +448,13 @@ abstract class JdbcDatabase implements Database<Connection> {
try {
// Create a contact row
String sql = "INSERT INTO contacts"
+ " (authorId, name, publicKey, localAuthorId, status)"
+ " VALUES (?, ?, ?, ?, ?)";
+ " (authorId, name, publicKey, localAuthorId)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, remote.getId().getBytes());
ps.setString(2, remote.getName());
ps.setBytes(3, remote.getPublicKey());
ps.setBytes(4, local.getBytes());
ps.setInt(5, ADDING.getValue());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
@@ -540,16 +523,15 @@ abstract class JdbcDatabase implements Database<Connection> {
throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO localAuthors (authorId, name, publicKey,"
+ " privateKey, created, status)"
+ " VALUES (?, ?, ?, ?, ?, ?)";
String sql = "INSERT INTO localAuthors"
+ " (authorId, name, publicKey, privateKey, created)"
+ " VALUES (?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, a.getId().getBytes());
ps.setString(2, a.getName());
ps.setBytes(3, a.getPublicKey());
ps.setBytes(4, a.getPrivateKey());
ps.setLong(5, a.getTimeCreated());
ps.setInt(6, a.getStatus().getValue());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
@@ -969,8 +951,7 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT authorId, name, publicKey, localAuthorId,"
+ " status"
String sql = "SELECT authorId, name, publicKey, localAuthorId"
+ " FROM contacts"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
@@ -981,11 +962,10 @@ abstract class JdbcDatabase implements Database<Connection> {
String name = rs.getString(2);
byte[] publicKey = rs.getBytes(3);
AuthorId localAuthorId = new AuthorId(rs.getBytes(4));
StorageStatus status = StorageStatus.fromValue(rs.getInt(5));
rs.close();
ps.close();
Author author = new Author(authorId, name, publicKey);
return new Contact(c, author, localAuthorId, status);
return new Contact(c, author, localAuthorId);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
@@ -1019,7 +999,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ResultSet rs = null;
try {
String sql = "SELECT contactId, authorId, name, publicKey,"
+ " localAuthorId, status"
+ " localAuthorId"
+ " FROM contacts";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
@@ -1031,9 +1011,7 @@ abstract class JdbcDatabase implements Database<Connection> {
byte[] publicKey = rs.getBytes(4);
Author author = new Author(authorId, name, publicKey);
AuthorId localAuthorId = new AuthorId(rs.getBytes(5));
StorageStatus status = StorageStatus.fromValue(rs.getInt(6));
contacts.add(new Contact(contactId, author, localAuthorId,
status));
contacts.add(new Contact(contactId, author, localAuthorId));
}
rs.close();
ps.close();
@@ -1120,7 +1098,7 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT name, publicKey, privateKey, created, status"
String sql = "SELECT name, publicKey, privateKey, created"
+ " FROM localAuthors"
+ " WHERE authorId = ?";
ps = txn.prepareStatement(sql);
@@ -1131,9 +1109,8 @@ abstract class JdbcDatabase implements Database<Connection> {
byte[] publicKey = rs.getBytes(2);
byte[] privateKey = rs.getBytes(3);
long created = rs.getLong(4);
StorageStatus status = StorageStatus.fromValue(rs.getInt(5));
LocalAuthor localAuthor = new LocalAuthor(a, name, publicKey,
privateKey, created, status);
privateKey, created);
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
@@ -1150,8 +1127,7 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT authorId, name, publicKey, privateKey,"
+ " created, status"
String sql = "SELECT authorId, name, publicKey, privateKey, created"
+ " FROM localAuthors";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
@@ -1162,9 +1138,8 @@ abstract class JdbcDatabase implements Database<Connection> {
byte[] publicKey = rs.getBytes(3);
byte[] privateKey = rs.getBytes(4);
long created = rs.getLong(5);
StorageStatus status = StorageStatus.fromValue(rs.getInt(6));
authors.add(new LocalAuthor(authorId, name, publicKey,
privateKey, created, status));
privateKey, created));
}
rs.close();
ps.close();
@@ -2053,41 +2028,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setContactStatus(Connection txn, ContactId c, StorageStatus s)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE contacts SET status = ? WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, s.getValue());
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setLocalAuthorStatus(Connection txn, AuthorId a,
StorageStatus s) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE localAuthors SET status = ?"
+ " WHERE authorId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, s.getValue());
ps.setBytes(2, a.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setMessageShared(Connection txn, MessageId m, boolean shared)
throws DbException {
PreparedStatement ps = null;

View File

@@ -4,7 +4,6 @@ import com.google.inject.Inject;
import org.briarproject.api.FormatException;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.data.BdfDictionary;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.BdfReaderFactory;
@@ -13,6 +12,7 @@ import org.briarproject.api.data.MetadataParser;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.forum.Forum;
import org.briarproject.api.forum.ForumManager;
import org.briarproject.api.forum.ForumPost;
@@ -36,7 +36,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
@@ -58,20 +57,14 @@ class ForumManagerImpl implements ForumManager {
Logger.getLogger(ForumManagerImpl.class.getName());
private final DatabaseComponent db;
private final ContactManager contactManager;
private final BdfReaderFactory bdfReaderFactory;
private final MetadataEncoder metadataEncoder;
private final MetadataParser metadataParser;
/** Ensures isolation between database reads and writes. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Inject
ForumManagerImpl(DatabaseComponent db, ContactManager contactManager,
BdfReaderFactory bdfReaderFactory, MetadataEncoder metadataEncoder,
MetadataParser metadataParser) {
ForumManagerImpl(DatabaseComponent db, BdfReaderFactory bdfReaderFactory,
MetadataEncoder metadataEncoder, MetadataParser metadataParser) {
this.db = db;
this.contactManager = contactManager;
this.bdfReaderFactory = bdfReaderFactory;
this.metadataEncoder = metadataEncoder;
this.metadataParser = metadataParser;
@@ -84,7 +77,6 @@ class ForumManagerImpl implements ForumManager {
@Override
public void addLocalPost(ForumPost p) throws DbException {
lock.writeLock().lock();
try {
BdfDictionary d = new BdfDictionary();
d.put("timestamp", p.getMessage().getTimestamp());
@@ -102,45 +94,65 @@ class ForumManagerImpl implements ForumManager {
d.put("local", true);
d.put("read", true);
Metadata meta = metadataEncoder.encode(d);
db.addLocalMessage(p.getMessage(), CLIENT_ID, meta, true);
Transaction txn = db.startTransaction();
try {
db.addLocalMessage(txn, p.getMessage(), CLIENT_ID, meta, true);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (FormatException e) {
throw new RuntimeException(e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Forum getForum(GroupId g) throws DbException {
lock.readLock().lock();
try {
return parseForum(db.getGroup(g));
Group group;
Transaction txn = db.startTransaction();
try {
group = db.getGroup(txn, g);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return parseForum(group);
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Collection<Forum> getForums() throws DbException {
lock.readLock().lock();
try {
Collection<Group> groups;
Transaction txn = db.startTransaction();
try {
groups = db.getGroups(txn, CLIENT_ID);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
List<Forum> forums = new ArrayList<Forum>();
for (Group g : db.getGroups(CLIENT_ID)) forums.add(parseForum(g));
for (Group g : groups) forums.add(parseForum(g));
return Collections.unmodifiableList(forums);
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public byte[] getPostBody(MessageId m) throws DbException {
lock.readLock().lock();
try {
byte[] raw = db.getRawMessage(m);
byte[] raw;
Transaction txn = db.startTransaction();
try {
raw = db.getRawMessage(txn, m);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
ByteArrayInputStream in = new ByteArrayInputStream(raw,
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
@@ -161,74 +173,76 @@ class ForumManagerImpl implements ForumManager {
} catch (IOException e) {
// Shouldn't happen with ByteArrayInputStream
throw new RuntimeException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Collection<ForumPostHeader> getPostHeaders(GroupId g)
throws DbException {
lock.readLock().lock();
Set<AuthorId> localAuthorIds = new HashSet<AuthorId>();
Set<AuthorId> contactAuthorIds = new HashSet<AuthorId>();
Map<MessageId, Metadata> metadata;
Transaction txn = db.startTransaction();
try {
// Load the IDs of the user's identities
Set<AuthorId> localAuthorIds = new HashSet<AuthorId>();
for (LocalAuthor a : db.getLocalAuthors())
for (LocalAuthor a : db.getLocalAuthors(txn))
localAuthorIds.add(a.getId());
// Load the IDs of contacts' identities
Set<AuthorId> contactAuthorIds = new HashSet<AuthorId>();
for (Contact c : contactManager.getContacts())
for (Contact c : db.getContacts(txn))
contactAuthorIds.add(c.getAuthor().getId());
// Load and parse the metadata
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
Collection<ForumPostHeader> headers =
new ArrayList<ForumPostHeader>();
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
MessageId messageId = e.getKey();
Metadata meta = e.getValue();
try {
BdfDictionary d = metadataParser.parse(meta);
long timestamp = d.getInteger("timestamp");
Author author = null;
Author.Status authorStatus = ANONYMOUS;
BdfDictionary d1 = d.getDictionary("author", null);
if (d1 != null) {
AuthorId authorId = new AuthorId(d1.getRaw("id"));
String name = d1.getString("name");
byte[] publicKey = d1.getRaw("publicKey");
author = new Author(authorId, name, publicKey);
if (localAuthorIds.contains(authorId))
authorStatus = VERIFIED;
else if (contactAuthorIds.contains(authorId))
authorStatus = VERIFIED;
else authorStatus = UNKNOWN;
}
String contentType = d.getString("contentType");
boolean read = d.getBoolean("read");
headers.add(new ForumPostHeader(messageId, timestamp,
author, authorStatus, contentType, read));
} catch (FormatException ex) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, ex.toString(), ex);
}
}
return headers;
// Load the metadata
metadata = db.getMessageMetadata(txn, g);
txn.setComplete();
} finally {
lock.readLock().unlock();
db.endTransaction(txn);
}
// Parse the metadata
Collection<ForumPostHeader> headers = new ArrayList<ForumPostHeader>();
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
try {
BdfDictionary d = metadataParser.parse(e.getValue());
long timestamp = d.getInteger("timestamp");
Author author = null;
Author.Status authorStatus = ANONYMOUS;
BdfDictionary d1 = d.getDictionary("author", null);
if (d1 != null) {
AuthorId authorId = new AuthorId(d1.getRaw("id"));
String name = d1.getString("name");
byte[] publicKey = d1.getRaw("publicKey");
author = new Author(authorId, name, publicKey);
if (localAuthorIds.contains(authorId))
authorStatus = VERIFIED;
else if (contactAuthorIds.contains(authorId))
authorStatus = VERIFIED;
else authorStatus = UNKNOWN;
}
String contentType = d.getString("contentType");
boolean read = d.getBoolean("read");
headers.add(new ForumPostHeader(e.getKey(), timestamp, author,
authorStatus, contentType, read));
} catch (FormatException ex) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, ex.toString(), ex);
}
}
return headers;
}
@Override
public void setReadFlag(MessageId m, boolean read) throws DbException {
lock.writeLock().lock();
try {
BdfDictionary d = new BdfDictionary();
d.put("read", read);
db.mergeMessageMetadata(m, metadataEncoder.encode(d));
Metadata meta = metadataEncoder.encode(d);
Transaction txn = db.startTransaction();
try {
db.mergeMessageMetadata(txn, m, meta);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (FormatException e) {
throw new RuntimeException(e);
} finally {
lock.writeLock().unlock();
}
}

View File

@@ -5,7 +5,6 @@ import com.google.inject.Inject;
import org.briarproject.api.FormatException;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.contact.ContactManager.AddContactHook;
import org.briarproject.api.contact.ContactManager.RemoveContactHook;
import org.briarproject.api.data.BdfDictionary;
@@ -18,6 +17,7 @@ import org.briarproject.api.data.MetadataParser;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.forum.Forum;
import org.briarproject.api.forum.ForumManager;
import org.briarproject.api.forum.ForumSharingManager;
@@ -45,10 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.forum.ForumConstants.FORUM_SALT_LENGTH;
import static org.briarproject.api.forum.ForumConstants.MAX_FORUM_NAME_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
@@ -62,11 +59,7 @@ class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
private static final byte[] LOCAL_GROUP_DESCRIPTOR = new byte[0];
private static final Logger LOG =
Logger.getLogger(ForumSharingManagerImpl.class.getName());
private final DatabaseComponent db;
private final ContactManager contactManager;
private final ForumManager forumManager;
private final GroupFactory groupFactory;
private final PrivateGroupFactory privateGroupFactory;
@@ -79,18 +72,14 @@ class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
private final Clock clock;
private final Group localGroup;
/** Ensures isolation between database reads and writes. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Inject
ForumSharingManagerImpl(DatabaseComponent db,
ContactManager contactManager, ForumManager forumManager,
GroupFactory groupFactory, PrivateGroupFactory privateGroupFactory,
ForumManager forumManager, GroupFactory groupFactory,
PrivateGroupFactory privateGroupFactory,
MessageFactory messageFactory, BdfReaderFactory bdfReaderFactory,
BdfWriterFactory bdfWriterFactory, MetadataEncoder metadataEncoder,
MetadataParser metadataParser, SecureRandom random, Clock clock) {
this.db = db;
this.contactManager = contactManager;
this.forumManager = forumManager;
this.groupFactory = groupFactory;
this.privateGroupFactory = privateGroupFactory;
@@ -106,57 +95,39 @@ class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
}
@Override
public void addingContact(ContactId c) {
lock.writeLock().lock();
public void addingContact(Transaction txn, Contact c) throws DbException {
try {
// Create a group to share with the contact
Group g = getContactGroup(db.getContact(c));
Group g = getContactGroup(c);
// Store the group and share it with the contact
db.addGroup(g);
db.setVisibility(g.getId(), Collections.singletonList(c));
db.addGroup(txn, g);
db.setVisibleToContact(txn, c.getId(), g.getId(), true);
// Attach the contact ID to the group
BdfDictionary d = new BdfDictionary();
d.put("contactId", c.getInt());
db.mergeGroupMetadata(g.getId(), metadataEncoder.encode(d));
d.put("contactId", c.getId().getInt());
db.mergeGroupMetadata(txn, g.getId(), metadataEncoder.encode(d));
// Share any forums that are shared with all contacts
List<Forum> shared = getForumsSharedWithAllContacts();
storeMessage(g.getId(), shared, 0);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
List<Forum> shared = getForumsSharedWithAllContacts(txn);
storeMessage(txn, g.getId(), shared, 0);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
throw new DbException(e);
}
}
@Override
public void removingContact(ContactId c) {
lock.writeLock().lock();
try {
db.removeGroup(getContactGroup(db.getContact(c)));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
public void removingContact(Transaction txn, Contact c) throws DbException {
db.removeGroup(txn, getContactGroup(c));
}
@Override
public void validatingMessage(Message m, ClientId c, Metadata meta) {
public void validatingMessage(Transaction txn, Message m, ClientId c,
Metadata meta) throws DbException {
if (c.equals(CLIENT_ID)) {
lock.writeLock().lock();
try {
ContactId contactId = getContactId(m.getGroupId());
setForumVisibility(contactId, getVisibleForums(m));
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
ContactId contactId = getContactId(txn, m.getGroupId());
setForumVisibility(txn, contactId, getVisibleForums(txn, m));
} catch (FormatException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
throw new DbException(e);
}
}
}
@@ -179,149 +150,162 @@ class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
@Override
public void addForum(Forum f) throws DbException {
lock.writeLock().lock();
Transaction txn = db.startTransaction();
try {
db.addGroup(f.getGroup());
db.addGroup(txn, f.getGroup());
txn.setComplete();
} finally {
lock.writeLock().unlock();
db.endTransaction(txn);
}
}
@Override
public void removeForum(Forum f) throws DbException {
lock.writeLock().lock();
try {
// Update the list of forums shared with each contact
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
removeFromList(contactGroup.getId(), f);
// Update the list shared with each contact
Transaction txn = db.startTransaction();
try {
for (Contact c : db.getContacts(txn))
removeFromList(txn, getContactGroup(c).getId(), f);
db.removeGroup(txn, f.getGroup());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
db.removeGroup(f.getGroup());
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Collection<Forum> getAvailableForums() throws DbException {
lock.readLock().lock();
try {
// Get any forums we subscribe to
Set<Group> subscribed = new HashSet<Group>(db.getGroups(
forumManager.getClientId()));
// Get all forums shared by contacts
Set<Forum> available = new HashSet<Forum>();
for (Contact c : contactManager.getContacts()) {
Group g = getContactGroup(c);
// Find the latest update version
LatestUpdate latest = findLatest(g.getId(), false);
if (latest != null) {
// Retrieve and parse the latest update
byte[] raw = db.getRawMessage(latest.messageId);
for (Forum f : parseForumList(raw)) {
if (!subscribed.contains(f.getGroup()))
available.add(f);
Transaction txn = db.startTransaction();
try {
// Get any forums we subscribe to
Set<Group> subscribed = new HashSet<Group>(db.getGroups(txn,
forumManager.getClientId()));
// Get all forums shared by contacts
for (Contact c : db.getContacts(txn)) {
Group g = getContactGroup(c);
// Find the latest update version
LatestUpdate latest = findLatest(txn, g.getId(), false);
if (latest != null) {
// Retrieve and parse the latest update
byte[] raw = db.getRawMessage(txn, latest.messageId);
for (Forum f : parseForumList(raw)) {
if (!subscribed.contains(f.getGroup()))
available.add(f);
}
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return Collections.unmodifiableSet(available);
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Collection<Contact> getSharedBy(GroupId g) throws DbException {
lock.readLock().lock();
try {
List<Contact> subscribers = new ArrayList<Contact>();
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
if (listContains(contactGroup.getId(), g, false))
subscribers.add(c);
Transaction txn = db.startTransaction();
try {
for (Contact c : db.getContacts(txn)) {
if (listContains(txn, getContactGroup(c).getId(), g, false))
subscribers.add(c);
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return Collections.unmodifiableList(subscribers);
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Collection<ContactId> getSharedWith(GroupId g) throws DbException {
lock.readLock().lock();
try {
List<ContactId> shared = new ArrayList<ContactId>();
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
if (listContains(contactGroup.getId(), g, true))
shared.add(c.getId());
Transaction txn = db.startTransaction();
try {
for (Contact c : db.getContacts(txn)) {
if (listContains(txn, getContactGroup(c).getId(), g, true))
shared.add(c.getId());
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return Collections.unmodifiableList(shared);
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public void setSharedWith(GroupId g, Collection<ContactId> shared)
throws DbException {
lock.writeLock().lock();
try {
// Retrieve the forum
Forum f = parseForum(db.getGroup(g));
// Remove the forum from the list of forums shared with all contacts
removeFromList(localGroup.getId(), f);
// Update the list of forums shared with each contact
shared = new HashSet<ContactId>(shared);
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
if (shared.contains(c.getId())) {
if (addToList(contactGroup.getId(), f)) {
// If the contact is sharing the forum, make it visible
if (listContains(contactGroup.getId(), g, false))
db.setVisibleToContact(c.getId(), g, true);
Transaction txn = db.startTransaction();
try {
// Retrieve the forum
Forum f = parseForum(db.getGroup(txn, g));
// Remove the forum from the list shared with all contacts
removeFromList(txn, localGroup.getId(), f);
// Update the list shared with each contact
shared = new HashSet<ContactId>(shared);
for (Contact c : db.getContacts(txn)) {
Group cg = getContactGroup(c);
if (shared.contains(c.getId())) {
if (addToList(txn, cg.getId(), f)) {
if (listContains(txn, cg.getId(), g, false))
db.setVisibleToContact(txn, c.getId(), g, true);
}
} else {
removeFromList(txn, cg.getId(), f);
db.setVisibleToContact(txn, c.getId(), g, false);
}
} else {
removeFromList(contactGroup.getId(), f);
db.setVisibleToContact(c.getId(), g, false);
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void setSharedWithAll(GroupId g) throws DbException {
lock.writeLock().lock();
try {
// Retrieve the forum
Forum f = parseForum(db.getGroup(g));
// Add the forum to the list of forums shared with all contacts
addToList(localGroup.getId(), f);
// Add the forum to the list of forums shared with each contact
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
if (addToList(contactGroup.getId(), f)) {
// If the contact is sharing the forum, make it visible
if (listContains(contactGroup.getId(), g, false))
db.setVisibleToContact(getContactId(g), g, true);
Transaction txn = db.startTransaction();
try {
// Retrieve the forum
Forum f = parseForum(db.getGroup(txn, g));
// Add the forum to the list shared with all contacts
addToList(txn, localGroup.getId(), f);
// Add the forum to the list shared with each contact
for (Contact c : db.getContacts(txn)) {
Group cg = getContactGroup(c);
if (addToList(txn, cg.getId(), f)) {
if (listContains(txn, cg.getId(), g, false))
db.setVisibleToContact(txn, c.getId(), g, true);
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
@@ -329,23 +313,21 @@ class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
return privateGroupFactory.createPrivateGroup(CLIENT_ID, c);
}
// Locking: lock.writeLock
private List<Forum> getForumsSharedWithAllContacts() throws DbException,
FormatException {
private List<Forum> getForumsSharedWithAllContacts(Transaction txn)
throws DbException, FormatException {
// Ensure the local group exists
db.addGroup(localGroup);
db.addGroup(txn, localGroup);
// Find the latest update in the local group
LatestUpdate latest = findLatest(localGroup.getId(), true);
LatestUpdate latest = findLatest(txn, localGroup.getId(), true);
if (latest == null) return Collections.emptyList();
// Retrieve and parse the latest update
return parseForumList(db.getRawMessage(latest.messageId));
return parseForumList(db.getRawMessage(txn, latest.messageId));
}
// Locking: lock.readLock
private LatestUpdate findLatest(GroupId g, boolean local)
private LatestUpdate findLatest(Transaction txn, GroupId g, boolean local)
throws DbException, FormatException {
LatestUpdate latest = null;
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
Map<MessageId, Metadata> metadata = db.getMessageMetadata(txn, g);
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
BdfDictionary d = metadataParser.parse(e.getValue());
if (d.getBoolean("local") != local) continue;
@@ -384,16 +366,20 @@ class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
}
}
// Locking: lock.writeLock
private void storeMessage(GroupId g, List<Forum> forums, long version)
throws DbException, FormatException {
byte[] body = encodeForumList(forums, version);
long now = clock.currentTimeMillis();
Message m = messageFactory.createMessage(g, now, body);
BdfDictionary d = new BdfDictionary();
d.put("version", version);
d.put("local", true);
db.addLocalMessage(m, CLIENT_ID, metadataEncoder.encode(d), true);
private void storeMessage(Transaction txn, GroupId g, List<Forum> forums,
long version) throws DbException {
try {
byte[] body = encodeForumList(forums, version);
long now = clock.currentTimeMillis();
Message m = messageFactory.createMessage(g, now, body);
BdfDictionary d = new BdfDictionary();
d.put("version", version);
d.put("local", true);
Metadata meta = metadataEncoder.encode(d);
db.addLocalMessage(txn, m, CLIENT_ID, meta, true);
} catch (FormatException e) {
throw new RuntimeException(e);
}
}
private byte[] encodeForumList(List<Forum> forums, long version) {
@@ -418,23 +404,21 @@ class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
return out.toByteArray();
}
// Locking: lock.readLock
private ContactId getContactId(GroupId contactGroupId) throws DbException,
FormatException {
Metadata meta = db.getGroupMetadata(contactGroupId);
private ContactId getContactId(Transaction txn, GroupId contactGroupId)
throws DbException, FormatException {
Metadata meta = db.getGroupMetadata(txn, contactGroupId);
BdfDictionary d = metadataParser.parse(meta);
return new ContactId(d.getInteger("contactId").intValue());
}
// Locking: lock.readLock
private Set<GroupId> getVisibleForums(Message remoteUpdate)
throws DbException, FormatException {
private Set<GroupId> getVisibleForums(Transaction txn,
Message remoteUpdate) throws DbException, FormatException {
// Get the latest local update
LatestUpdate local = findLatest(remoteUpdate.getGroupId(), true);
LatestUpdate local = findLatest(txn, remoteUpdate.getGroupId(), true);
// If there's no local update, no forums are visible
if (local == null) return Collections.emptySet();
// Intersect the sets of shared forums
byte[] localRaw = db.getRawMessage(local.messageId);
byte[] localRaw = db.getRawMessage(txn, local.messageId);
Set<Forum> shared = new HashSet<Forum>(parseForumList(localRaw));
shared.retainAll(parseForumList(remoteUpdate.getRaw()));
// Forums in the intersection should be visible
@@ -443,16 +427,15 @@ class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
return visible;
}
// Locking: lock.writeLock
private void setForumVisibility(ContactId c, Set<GroupId> visible)
throws DbException {
for (Group g : db.getGroups(forumManager.getClientId())) {
boolean isVisible = db.isVisibleToContact(c, g.getId());
private void setForumVisibility(Transaction txn, ContactId c,
Set<GroupId> visible) throws DbException {
for (Group g : db.getGroups(txn, forumManager.getClientId())) {
boolean isVisible = db.isVisibleToContact(txn, c, g.getId());
boolean shouldBeVisible = visible.contains(g.getId());
if (isVisible && !shouldBeVisible)
db.setVisibleToContact(c, g.getId(), false);
db.setVisibleToContact(txn, c, g.getId(), false);
else if (!isVisible && shouldBeVisible)
db.setVisibleToContact(c, g.getId(), true);
db.setVisibleToContact(txn, c, g.getId(), true);
}
}
@@ -491,38 +474,38 @@ class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
}
}
// Locking: lock.readLock
private boolean listContains(GroupId g, GroupId forum, boolean local)
throws DbException, FormatException {
LatestUpdate latest = findLatest(g, local);
private boolean listContains(Transaction txn, GroupId g, GroupId forum,
boolean local) throws DbException, FormatException {
LatestUpdate latest = findLatest(txn, g, local);
if (latest == null) return false;
List<Forum> list = parseForumList(db.getRawMessage(latest.messageId));
byte[] raw = db.getRawMessage(txn, latest.messageId);
List<Forum> list = parseForumList(raw);
for (Forum f : list) if (f.getId().equals(forum)) return true;
return false;
}
// Locking: lock.writeLock
private boolean addToList(GroupId g, Forum f) throws DbException,
FormatException {
LatestUpdate latest = findLatest(g, true);
private boolean addToList(Transaction txn, GroupId g, Forum f)
throws DbException, FormatException {
LatestUpdate latest = findLatest(txn, g, true);
if (latest == null) {
storeMessage(g, Collections.singletonList(f), 0);
storeMessage(txn, g, Collections.singletonList(f), 0);
return true;
}
List<Forum> list = parseForumList(db.getRawMessage(latest.messageId));
byte[] raw = db.getRawMessage(txn, latest.messageId);
List<Forum> list = parseForumList(raw);
if (list.contains(f)) return false;
list.add(f);
storeMessage(g, list, latest.version + 1);
storeMessage(txn, g, list, latest.version + 1);
return true;
}
// Locking: lock.writeLock
private void removeFromList(GroupId g, Forum f) throws DbException,
FormatException {
LatestUpdate latest = findLatest(g, true);
private void removeFromList(Transaction txn, GroupId g, Forum f)
throws DbException, FormatException {
LatestUpdate latest = findLatest(txn, g, true);
if (latest == null) return;
List<Forum> list = parseForumList(db.getRawMessage(latest.messageId));
if (list.remove(f)) storeMessage(g, list, latest.version + 1);
byte[] raw = db.getRawMessage(txn, latest.messageId);
List<Forum> list = parseForumList(raw);
if (list.remove(f)) storeMessage(txn, g, list, latest.version + 1);
}
private static class LatestUpdate {

View File

@@ -1,4 +1,4 @@
package org.briarproject.sync;
package org.briarproject.identity;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.BdfWriter;
@@ -14,9 +14,6 @@ import java.io.IOException;
import javax.inject.Inject;
import static org.briarproject.api.db.StorageStatus.ADDING;
// TODO: Move this class to the identity package
class AuthorFactoryImpl implements AuthorFactory {
private final CryptoComponent crypto;
@@ -38,7 +35,7 @@ class AuthorFactoryImpl implements AuthorFactory {
public LocalAuthor createLocalAuthor(String name, byte[] publicKey,
byte[] privateKey) {
return new LocalAuthor(getId(name, publicKey), name, publicKey,
privateKey, clock.currentTimeMillis(), ADDING);
privateKey, clock.currentTimeMillis());
}
private AuthorId getId(String name, byte[] publicKey) {

View File

@@ -1,4 +1,4 @@
package org.briarproject.sync;
package org.briarproject.identity;
import org.briarproject.api.FormatException;
import org.briarproject.api.data.BdfReader;
@@ -11,7 +11,6 @@ import java.io.IOException;
import static org.briarproject.api.identity.AuthorConstants.MAX_AUTHOR_NAME_LENGTH;
import static org.briarproject.api.identity.AuthorConstants.MAX_PUBLIC_KEY_LENGTH;
// TODO: Move this class to the identity package
class AuthorReader implements ObjectReader<Author> {
private final AuthorFactory authorFactory;

View File

@@ -4,74 +4,28 @@ import com.google.inject.Inject;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.NoSuchLocalAuthorException;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.LocalAuthorAddedEvent;
import org.briarproject.api.event.LocalAuthorRemovedEvent;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.identity.AuthorId;
import org.briarproject.api.identity.IdentityManager;
import org.briarproject.api.identity.LocalAuthor;
import org.briarproject.api.lifecycle.Service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.db.StorageStatus.ACTIVE;
import static org.briarproject.api.db.StorageStatus.ADDING;
import static org.briarproject.api.db.StorageStatus.REMOVING;
class IdentityManagerImpl implements IdentityManager, Service {
private static final Logger LOG =
Logger.getLogger(IdentityManagerImpl.class.getName());
class IdentityManagerImpl implements IdentityManager {
private final DatabaseComponent db;
private final EventBus eventBus;
private final List<AddIdentityHook> addHooks;
private final List<RemoveIdentityHook> removeHooks;
@Inject
IdentityManagerImpl(DatabaseComponent db, EventBus eventBus) {
IdentityManagerImpl(DatabaseComponent db) {
this.db = db;
this.eventBus = eventBus;
addHooks = new CopyOnWriteArrayList<AddIdentityHook>();
removeHooks = new CopyOnWriteArrayList<RemoveIdentityHook>();
}
@Override
public boolean start() {
// Finish adding/removing any partly added/removed pseudonyms
try {
for (LocalAuthor a : db.getLocalAuthors()) {
if (a.getStatus().equals(ADDING)) {
for (AddIdentityHook hook : addHooks)
hook.addingIdentity(a.getId());
db.setLocalAuthorStatus(a.getId(), ACTIVE);
eventBus.broadcast(new LocalAuthorAddedEvent(a.getId()));
} else if (a.getStatus().equals(REMOVING)) {
for (RemoveIdentityHook hook : removeHooks)
hook.removingIdentity(a.getId());
db.removeLocalAuthor(a.getId());
eventBus.broadcast(new LocalAuthorRemovedEvent(a.getId()));
}
}
return true;
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return false;
}
}
@Override
public boolean stop() {
return false;
}
@Override
public void registerAddIdentityHook(AddIdentityHook hook) {
addHooks.add(hook);
@@ -83,35 +37,55 @@ class IdentityManagerImpl implements IdentityManager, Service {
}
@Override
public void addLocalAuthor(LocalAuthor a) throws DbException {
db.addLocalAuthor(a);
for (AddIdentityHook hook : addHooks) hook.addingIdentity(a.getId());
db.setLocalAuthorStatus(a.getId(), ACTIVE);
eventBus.broadcast(new LocalAuthorAddedEvent(a.getId()));
public void addLocalAuthor(LocalAuthor localAuthor) throws DbException {
Transaction txn = db.startTransaction();
try {
db.addLocalAuthor(txn, localAuthor);
for (AddIdentityHook hook : addHooks)
hook.addingIdentity(txn, localAuthor);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
}
@Override
public LocalAuthor getLocalAuthor(AuthorId a) throws DbException {
LocalAuthor author = db.getLocalAuthor(a);
if (author.getStatus().equals(ACTIVE)) return author;
throw new NoSuchLocalAuthorException();
LocalAuthor author;
Transaction txn = db.startTransaction();
try {
author = db.getLocalAuthor(txn, a);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return author;
}
@Override
public Collection<LocalAuthor> getLocalAuthors() throws DbException {
Collection<LocalAuthor> authors = db.getLocalAuthors();
// Filter out any pseudonyms that are being added or removed
List<LocalAuthor> active = new ArrayList<LocalAuthor>(authors.size());
for (LocalAuthor a : authors)
if (a.getStatus().equals(ACTIVE)) active.add(a);
return Collections.unmodifiableList(active);
Collection<LocalAuthor> authors;
Transaction txn = db.startTransaction();
try {
authors = db.getLocalAuthors(txn);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return authors;
}
@Override
public void removeLocalAuthor(AuthorId a) throws DbException {
db.setLocalAuthorStatus(a, REMOVING);
for (RemoveIdentityHook hook : removeHooks) hook.removingIdentity(a);
db.removeLocalAuthor(a);
eventBus.broadcast(new LocalAuthorRemovedEvent(a));
Transaction txn = db.startTransaction();
try {
LocalAuthor localAuthor = db.getLocalAuthor(txn, a);
for (RemoveIdentityHook hook : removeHooks)
hook.removingIdentity(txn, localAuthor);
db.removeLocalAuthor(txn, a);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
}
}

View File

@@ -1,13 +1,23 @@
package org.briarproject.identity;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorFactory;
import org.briarproject.api.identity.IdentityManager;
public class IdentityModule extends AbstractModule {
@Override
protected void configure() {
bind(AuthorFactory.class).to(AuthorFactoryImpl.class);
bind(IdentityManager.class).to(IdentityManagerImpl.class);
}
@Provides
ObjectReader<Author> getAuthorReader(AuthorFactory authorFactory) {
return new AuthorReader(authorFactory);
}
}

View File

@@ -5,7 +5,6 @@ import com.google.inject.Inject;
import org.briarproject.api.FormatException;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.contact.ContactManager.AddContactHook;
import org.briarproject.api.contact.ContactManager.RemoveContactHook;
import org.briarproject.api.data.BdfDictionary;
@@ -16,7 +15,7 @@ import org.briarproject.api.data.MetadataParser;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.NoSuchContactException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.messaging.MessagingManager;
import org.briarproject.api.messaging.PrivateMessage;
import org.briarproject.api.messaging.PrivateMessageHeader;
@@ -32,7 +31,6 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.logging.Logger;
@@ -51,19 +49,17 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
Logger.getLogger(MessagingManagerImpl.class.getName());
private final DatabaseComponent db;
private final ContactManager contactManager;
private final PrivateGroupFactory privateGroupFactory;
private final BdfReaderFactory bdfReaderFactory;
private final MetadataEncoder metadataEncoder;
private final MetadataParser metadataParser;
@Inject
MessagingManagerImpl(DatabaseComponent db, ContactManager contactManager,
MessagingManagerImpl(DatabaseComponent db,
PrivateGroupFactory privateGroupFactory,
BdfReaderFactory bdfReaderFactory, MetadataEncoder metadataEncoder,
MetadataParser metadataParser) {
this.db = db;
this.contactManager = contactManager;
this.privateGroupFactory = privateGroupFactory;
this.bdfReaderFactory = bdfReaderFactory;
this.metadataEncoder = metadataEncoder;
@@ -71,19 +67,17 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
}
@Override
public void addingContact(ContactId c) {
public void addingContact(Transaction txn, Contact c) throws DbException {
try {
// Create a group to share with the contact
Group g = getContactGroup(db.getContact(c));
Group g = getContactGroup(c);
// Store the group and share it with the contact
db.addGroup(g);
db.setVisibility(g.getId(), Collections.singletonList(c));
db.addGroup(txn, g);
db.setVisibleToContact(txn, c.getId(), g.getId(), true);
// Attach the contact ID to the group
BdfDictionary d = new BdfDictionary();
d.put("contactId", c.getInt());
db.mergeGroupMetadata(g.getId(), metadataEncoder.encode(d));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
d.put("contactId", c.getId().getInt());
db.mergeGroupMetadata(txn, g.getId(), metadataEncoder.encode(d));
} catch (FormatException e) {
throw new RuntimeException(e);
}
@@ -94,12 +88,8 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
}
@Override
public void removingContact(ContactId c) {
try {
db.removeGroup(getContactGroup(db.getContact(c)));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
public void removingContact(Transaction txn, Contact c) throws DbException {
db.removeGroup(txn, getContactGroup(c));
}
@Override
@@ -109,15 +99,22 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
@Override
public void addLocalMessage(PrivateMessage m) throws DbException {
BdfDictionary d = new BdfDictionary();
d.put("timestamp", m.getMessage().getTimestamp());
if (m.getParent() != null) d.put("parent", m.getParent().getBytes());
d.put("contentType", m.getContentType());
d.put("local", true);
d.put("read", true);
try {
BdfDictionary d = new BdfDictionary();
d.put("timestamp", m.getMessage().getTimestamp());
if (m.getParent() != null)
d.put("parent", m.getParent().getBytes());
d.put("contentType", m.getContentType());
d.put("local", true);
d.put("read", true);
Metadata meta = metadataEncoder.encode(d);
db.addLocalMessage(m.getMessage(), CLIENT_ID, meta, true);
Transaction txn = db.startTransaction();
try {
db.addLocalMessage(txn, m.getMessage(), CLIENT_ID, meta, true);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (FormatException e) {
throw new RuntimeException(e);
}
@@ -126,26 +123,48 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
@Override
public ContactId getContactId(GroupId g) throws DbException {
try {
BdfDictionary d = metadataParser.parse(db.getGroupMetadata(g));
long id = d.getInteger("contactId");
return new ContactId((int) id);
Metadata meta;
Transaction txn = db.startTransaction();
try {
meta = db.getGroupMetadata(txn, g);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
BdfDictionary d = metadataParser.parse(meta);
return new ContactId(d.getInteger("contactId").intValue());
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
throw new NoSuchContactException();
throw new DbException(e);
}
}
@Override
public GroupId getConversationId(ContactId c) throws DbException {
return getContactGroup(contactManager.getContact(c)).getId();
Contact contact;
Transaction txn = db.startTransaction();
try {
contact = db.getContact(txn, c);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return getContactGroup(contact).getId();
}
@Override
public Collection<PrivateMessageHeader> getMessageHeaders(ContactId c)
throws DbException {
GroupId groupId = getConversationId(c);
Map<MessageId, Metadata> metadata = db.getMessageMetadata(groupId);
Collection<MessageStatus> statuses = db.getMessageStatus(c, groupId);
Map<MessageId, Metadata> metadata;
Collection<MessageStatus> statuses;
Transaction txn = db.startTransaction();
try {
GroupId g = getContactGroup(db.getContact(txn, c)).getId();
metadata = db.getMessageMetadata(txn, g);
statuses = db.getMessageStatus(txn, c, g);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
Collection<PrivateMessageHeader> headers =
new ArrayList<PrivateMessageHeader>();
for (MessageStatus s : statuses) {
@@ -169,7 +188,14 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
@Override
public byte[] getMessageBody(MessageId m) throws DbException {
byte[] raw = db.getRawMessage(m);
byte[] raw;
Transaction txn = db.startTransaction();
try {
raw = db.getRawMessage(txn, m);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
ByteArrayInputStream in = new ByteArrayInputStream(raw,
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
@@ -192,10 +218,17 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
@Override
public void setReadFlag(MessageId m, boolean read) throws DbException {
BdfDictionary d = new BdfDictionary();
d.put("read", read);
try {
db.mergeMessageMetadata(m, metadataEncoder.encode(d));
BdfDictionary d = new BdfDictionary();
d.put("read", read);
Metadata meta = metadataEncoder.encode(d);
Transaction txn = db.startTransaction();
try {
db.mergeMessageMetadata(txn, m, meta);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (FormatException e) {
throw new RuntimeException(e);
}

View File

@@ -4,6 +4,7 @@ import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.TransportDisabledEvent;
import org.briarproject.api.event.TransportEnabledEvent;
@@ -27,6 +28,7 @@ import org.briarproject.api.plugins.simplex.SimplexPluginFactory;
import org.briarproject.api.properties.TransportProperties;
import org.briarproject.api.properties.TransportPropertyManager;
import org.briarproject.api.settings.Settings;
import org.briarproject.api.settings.SettingsManager;
import org.briarproject.api.system.Clock;
import org.briarproject.api.ui.UiCallback;
@@ -60,6 +62,7 @@ class PluginManagerImpl implements PluginManager, Service {
private final DatabaseComponent db;
private final Poller poller;
private final ConnectionManager connectionManager;
private final SettingsManager settingsManager;
private final TransportPropertyManager transportPropertyManager;
private final UiCallback uiCallback;
private final Map<TransportId, Plugin> plugins;
@@ -72,6 +75,7 @@ class PluginManagerImpl implements PluginManager, Service {
DuplexPluginConfig duplexPluginConfig, Clock clock,
DatabaseComponent db, Poller poller,
ConnectionManager connectionManager,
SettingsManager settingsManager,
TransportPropertyManager transportPropertyManager,
UiCallback uiCallback) {
this.ioExecutor = ioExecutor;
@@ -82,6 +86,7 @@ class PluginManagerImpl implements PluginManager, Service {
this.db = db;
this.poller = poller;
this.connectionManager = connectionManager;
this.settingsManager = settingsManager;
this.transportPropertyManager = transportPropertyManager;
this.uiCallback = uiCallback;
plugins = new ConcurrentHashMap<TransportId, Plugin>();
@@ -181,7 +186,13 @@ class PluginManagerImpl implements PluginManager, Service {
}
try {
long start = clock.currentTimeMillis();
db.addTransport(id, plugin.getMaxLatency());
Transaction txn = db.startTransaction();
try {
db.addTransport(txn, id, plugin.getMaxLatency());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
long duration = clock.currentTimeMillis() - start;
if (LOG.isLoggable(INFO))
LOG.info("Adding transport took " + duration + " ms");
@@ -244,7 +255,13 @@ class PluginManagerImpl implements PluginManager, Service {
}
try {
long start = clock.currentTimeMillis();
db.addTransport(id, plugin.getMaxLatency());
Transaction txn = db.startTransaction();
try {
db.addTransport(txn, id, plugin.getMaxLatency());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
long duration = clock.currentTimeMillis() - start;
if (LOG.isLoggable(INFO))
LOG.info("Adding transport took " + duration + " ms");
@@ -319,7 +336,7 @@ class PluginManagerImpl implements PluginManager, Service {
public Settings getSettings() {
try {
return db.getSettings(id.getString());
return settingsManager.getSettings(id.getString());
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return new Settings();
@@ -348,7 +365,7 @@ class PluginManagerImpl implements PluginManager, Service {
public void mergeSettings(Settings s) {
try {
db.mergeSettings(s, id.getString());
settingsManager.mergeSettings(s, id.getString());
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}

View File

@@ -7,7 +7,6 @@ import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.contact.ContactManager.AddContactHook;
import org.briarproject.api.contact.ContactManager.RemoveContactHook;
import org.briarproject.api.data.BdfDictionary;
@@ -21,6 +20,7 @@ import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.NoSuchGroupException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.properties.TransportProperties;
import org.briarproject.api.properties.TransportPropertyManager;
import org.briarproject.api.sync.ClientId;
@@ -41,10 +41,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.properties.TransportPropertyConstants.MAX_PROPERTY_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
@@ -57,11 +54,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
private static final byte[] LOCAL_GROUP_DESCRIPTOR = new byte[0];
private static final Logger LOG =
Logger.getLogger(TransportPropertyManagerImpl.class.getName());
private final DatabaseComponent db;
private final ContactManager contactManager;
private final PrivateGroupFactory privateGroupFactory;
private final MessageFactory messageFactory;
private final BdfReaderFactory bdfReaderFactory;
@@ -71,18 +64,13 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
private final Clock clock;
private final Group localGroup;
/** Ensures isolation between database reads and writes. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Inject
TransportPropertyManagerImpl(DatabaseComponent db,
ContactManager contactManager, GroupFactory groupFactory,
PrivateGroupFactory privateGroupFactory,
GroupFactory groupFactory, PrivateGroupFactory privateGroupFactory,
MessageFactory messageFactory, BdfReaderFactory bdfReaderFactory,
BdfWriterFactory bdfWriterFactory, MetadataEncoder metadataEncoder,
MetadataParser metadataParser, Clock clock) {
this.db = db;
this.contactManager = contactManager;
this.privateGroupFactory = privateGroupFactory;
this.messageFactory = messageFactory;
this.bdfReaderFactory = bdfReaderFactory;
@@ -95,165 +83,156 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
}
@Override
public void addingContact(ContactId c) {
lock.writeLock().lock();
try {
// Create a group to share with the contact
Group g = getContactGroup(db.getContact(c));
// Store the group and share it with the contact
db.addGroup(g);
db.setVisibility(g.getId(), Collections.singletonList(c));
// Copy the latest local properties into the group
DeviceId dev = db.getDeviceId();
Map<TransportId, TransportProperties> local = getLocalProperties();
for (Entry<TransportId, TransportProperties> e : local.entrySet()) {
storeMessage(g.getId(), dev, e.getKey(), e.getValue(), 1, true,
true);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
public void addingContact(Transaction txn, Contact c) throws DbException {
// Create a group to share with the contact
Group g = getContactGroup(c);
// Store the group and share it with the contact
db.addGroup(txn, g);
db.setVisibleToContact(txn, c.getId(), g.getId(), true);
// Copy the latest local properties into the group
DeviceId dev = db.getDeviceId(txn);
Map<TransportId, TransportProperties> local = getLocalProperties(txn);
for (Entry<TransportId, TransportProperties> e : local.entrySet()) {
storeMessage(txn, g.getId(), dev, e.getKey(), e.getValue(), 1,
true, true);
}
}
@Override
public void removingContact(ContactId c) {
lock.writeLock().lock();
try {
db.removeGroup(getContactGroup(db.getContact(c)));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
public void removingContact(Transaction txn, Contact c) throws DbException {
db.removeGroup(txn, getContactGroup(c));
}
@Override
public void addRemoteProperties(ContactId c, DeviceId dev,
Map<TransportId, TransportProperties> props) throws DbException {
lock.writeLock().lock();
Transaction txn = db.startTransaction();
try {
Group g = getContactGroup(db.getContact(c));
Group g = getContactGroup(db.getContact(txn, c));
for (Entry<TransportId, TransportProperties> e : props.entrySet()) {
storeMessage(g.getId(), dev, e.getKey(), e.getValue(), 0, false,
false);
storeMessage(txn, g.getId(), dev, e.getKey(), e.getValue(), 0,
false, false);
}
} catch (FormatException e) {
throw new DbException(e);
txn.setComplete();
} finally {
lock.writeLock().unlock();
db.endTransaction(txn);
}
}
@Override
public Map<TransportId, TransportProperties> getLocalProperties()
throws DbException {
lock.readLock().lock();
Map<TransportId, TransportProperties> local;
Transaction txn = db.startTransaction();
try {
// Find the latest local update for each transport
Map<TransportId, LatestUpdate> latest =
findLatest(localGroup.getId(), true);
// Retrieve and parse the latest local properties
Map<TransportId, TransportProperties> local =
new HashMap<TransportId, TransportProperties>();
for (Entry<TransportId, LatestUpdate> e : latest.entrySet()) {
byte[] raw = db.getRawMessage(e.getValue().messageId);
local.put(e.getKey(), parseProperties(raw));
}
return Collections.unmodifiableMap(local);
} catch (NoSuchGroupException e) {
// Local group doesn't exist - there are no local properties
return Collections.emptyMap();
} catch (IOException e) {
throw new DbException(e);
local = getLocalProperties(txn);
txn.setComplete();
} finally {
lock.readLock().unlock();
db.endTransaction(txn);
}
return Collections.unmodifiableMap(local);
}
@Override
public TransportProperties getLocalProperties(TransportId t)
throws DbException {
lock.readLock().lock();
try {
// Find the latest local update
LatestUpdate latest = findLatest(localGroup.getId(), t, true);
if (latest == null) return null;
// Retrieve and parse the latest local properties
return parseProperties(db.getRawMessage(latest.messageId));
TransportProperties p = null;
Transaction txn = db.startTransaction();
try {
// Find the latest local update
LatestUpdate latest = findLatest(txn, localGroup.getId(), t,
true);
if (latest != null) {
// Retrieve and parse the latest local properties
byte[] raw = db.getRawMessage(txn, latest.messageId);
p = parseProperties(raw);
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return p;
} catch (NoSuchGroupException e) {
// Local group doesn't exist - there are no local properties
return null;
} catch (IOException e) {
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Map<ContactId, TransportProperties> getRemoteProperties(
TransportId t) throws DbException {
lock.readLock().lock();
try {
Map<ContactId, TransportProperties> remote =
new HashMap<ContactId, TransportProperties>();
for (Contact c : contactManager.getContacts()) {
Group g = getContactGroup(c);
// Find the latest remote update
LatestUpdate latest = findLatest(g.getId(), t, false);
if (latest != null) {
// Retrieve and parse the latest remote properties
byte[] raw = db.getRawMessage(latest.messageId);
remote.put(c.getId(), parseProperties(raw));
Transaction txn = db.startTransaction();
try {
for (Contact c : db.getContacts(txn)) {
Group g = getContactGroup(c);
// Find the latest remote update
LatestUpdate latest = findLatest(txn, g.getId(), t, false);
if (latest != null) {
// Retrieve and parse the latest remote properties
byte[] raw = db.getRawMessage(txn, latest.messageId);
remote.put(c.getId(), parseProperties(raw));
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return Collections.unmodifiableMap(remote);
} catch (IOException e) {
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public void mergeLocalProperties(TransportId t, TransportProperties p)
throws DbException {
lock.writeLock().lock();
try {
// Create the local group if necessary
db.addGroup(localGroup);
// Merge the new properties with any existing properties
TransportProperties merged;
LatestUpdate latest = findLatest(localGroup.getId(), t, true);
if (latest == null) {
merged = p;
} else {
byte[] raw = db.getRawMessage(latest.messageId);
TransportProperties old = parseProperties(raw);
merged = new TransportProperties(old);
merged.putAll(p);
if (merged.equals(old)) return; // Unchanged
Transaction txn = db.startTransaction();
try {
// Create the local group if necessary
db.addGroup(txn, localGroup);
// Merge the new properties with any existing properties
TransportProperties merged;
boolean changed;
LatestUpdate latest = findLatest(txn, localGroup.getId(), t,
true);
if (latest == null) {
merged = p;
changed = true;
} else {
byte[] raw = db.getRawMessage(txn, latest.messageId);
TransportProperties old = parseProperties(raw);
merged = new TransportProperties(old);
merged.putAll(p);
changed = !merged.equals(old);
}
if (changed) {
// Store the merged properties in the local group
DeviceId dev = db.getDeviceId(txn);
long version = latest == null ? 1 : latest.version + 1;
storeMessage(txn, localGroup.getId(), dev, t, merged,
version, true, false);
// Store the merged properties in each contact's group
for (Contact c : db.getContacts(txn)) {
Group g = getContactGroup(c);
latest = findLatest(txn, g.getId(), t, true);
version = latest == null ? 1 : latest.version + 1;
storeMessage(txn, g.getId(), dev, t, merged, version,
true, true);
}
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
// Store the merged properties in the local group
DeviceId dev = db.getDeviceId();
long version = latest == null ? 1 : latest.version + 1;
storeMessage(localGroup.getId(), dev, t, merged, version, true,
false);
// Store the merged properties in each contact's group
for (Contact c : contactManager.getContacts()) {
Group g = getContactGroup(c);
latest = findLatest(g.getId(), t, true);
version = latest == null ? 1 : latest.version + 1;
storeMessage(g.getId(), dev, t, merged, version, true, true);
}
} catch (IOException e) {
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
@@ -261,18 +240,44 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
return privateGroupFactory.createPrivateGroup(CLIENT_ID, c);
}
// Locking: lock.writeLock
private void storeMessage(GroupId g, DeviceId dev, TransportId t,
TransportProperties p, long version, boolean local, boolean shared)
throws DbException, FormatException {
byte[] body = encodeProperties(dev, t, p, version);
long now = clock.currentTimeMillis();
Message m = messageFactory.createMessage(g, now, body);
BdfDictionary d = new BdfDictionary();
d.put("transportId", t.getString());
d.put("version", version);
d.put("local", local);
db.addLocalMessage(m, CLIENT_ID, metadataEncoder.encode(d), shared);
private Map<TransportId, TransportProperties> getLocalProperties(
Transaction txn) throws DbException {
try {
Map<TransportId, TransportProperties> local =
new HashMap<TransportId, TransportProperties>();
// Find the latest local update for each transport
Map<TransportId, LatestUpdate> latest = findLatest(txn,
localGroup.getId(), true);
// Retrieve and parse the latest local properties
for (Entry<TransportId, LatestUpdate> e : latest.entrySet()) {
byte[] raw = db.getRawMessage(txn, e.getValue().messageId);
local.put(e.getKey(), parseProperties(raw));
}
return local;
} catch (NoSuchGroupException e) {
// Local group doesn't exist - there are no local properties
return Collections.emptyMap();
} catch (FormatException e) {
throw new DbException(e);
}
}
private void storeMessage(Transaction txn, GroupId g, DeviceId dev,
TransportId t, TransportProperties p, long version, boolean local,
boolean shared) throws DbException {
try {
byte[] body = encodeProperties(dev, t, p, version);
long now = clock.currentTimeMillis();
Message m = messageFactory.createMessage(g, now, body);
BdfDictionary d = new BdfDictionary();
d.put("transportId", t.getString());
d.put("version", version);
d.put("local", local);
Metadata meta = metadataEncoder.encode(d);
db.addLocalMessage(txn, m, CLIENT_ID, meta, shared);
} catch (FormatException e) {
throw new RuntimeException(e);
}
}
private byte[] encodeProperties(DeviceId dev, TransportId t,
@@ -293,12 +298,11 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
return out.toByteArray();
}
// Locking: lock.readLock
private Map<TransportId, LatestUpdate> findLatest(GroupId g, boolean local)
throws DbException, FormatException {
private Map<TransportId, LatestUpdate> findLatest(Transaction txn,
GroupId g, boolean local) throws DbException, FormatException {
Map<TransportId, LatestUpdate> latestUpdates =
new HashMap<TransportId, LatestUpdate>();
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
Map<MessageId, Metadata> metadata = db.getMessageMetadata(txn, g);
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
BdfDictionary d = metadataParser.parse(e.getValue());
if (d.getBoolean("local") == local) {
@@ -312,11 +316,10 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
return latestUpdates;
}
// Locking: lock.readLock
private LatestUpdate findLatest(GroupId g, TransportId t, boolean local)
throws DbException, FormatException {
private LatestUpdate findLatest(Transaction txn, GroupId g, TransportId t,
boolean local) throws DbException, FormatException {
LatestUpdate latest = null;
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
Map<MessageId, Metadata> metadata = db.getMessageMetadata(txn, g);
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
BdfDictionary d = metadataParser.parse(e.getValue());
if (d.getString("transportId").equals(t.getString())
@@ -330,25 +333,32 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
}
private TransportProperties parseProperties(byte[] raw)
throws IOException {
throws FormatException {
TransportProperties p = new TransportProperties();
ByteArrayInputStream in = new ByteArrayInputStream(raw,
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
r.readListStart();
r.skipRaw(); // Device ID
r.skipString(); // Transport ID
r.skipInteger(); // Version
r.readDictionaryStart();
while (!r.hasDictionaryEnd()) {
String key = r.readString(MAX_PROPERTY_LENGTH);
String value = r.readString(MAX_PROPERTY_LENGTH);
p.put(key, value);
try {
r.readListStart();
r.skipRaw(); // Device ID
r.skipString(); // Transport ID
r.skipInteger(); // Version
r.readDictionaryStart();
while (!r.hasDictionaryEnd()) {
String key = r.readString(MAX_PROPERTY_LENGTH);
String value = r.readString(MAX_PROPERTY_LENGTH);
p.put(key, value);
}
r.readDictionaryEnd();
r.readListEnd();
if (!r.eof()) throw new FormatException();
return p;
} catch (FormatException e) {
throw e;
} catch (IOException e) {
// Shouldn't happen with ByteArrayInputStream
throw new RuntimeException(e);
}
r.readDictionaryEnd();
r.readListEnd();
if (!r.eof()) throw new FormatException();
return p;
}
private static class LatestUpdate {

View File

@@ -4,6 +4,7 @@ import com.google.inject.Inject;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.settings.Settings;
import org.briarproject.api.settings.SettingsManager;
@@ -18,11 +19,25 @@ class SettingsManagerImpl implements SettingsManager {
@Override
public Settings getSettings(String namespace) throws DbException {
return db.getSettings(namespace);
Settings s;
Transaction txn = db.startTransaction();
try {
s = db.getSettings(txn, namespace);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return s;
}
@Override
public void mergeSettings(Settings s, String namespace) throws DbException {
db.mergeSettings(s, namespace);
Transaction txn = db.startTransaction();
try {
db.mergeSettings(txn, s, namespace);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
}
}

View File

@@ -4,6 +4,7 @@ import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
@@ -50,8 +51,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private static final ThrowingRunnable<IOException> CLOSE =
new ThrowingRunnable<IOException>() {
public void run() {}
};
public void run() {}
};
private final DatabaseComponent db;
private final Executor dbExecutor;
@@ -178,7 +179,14 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
public void run() {
if (interrupted) return;
try {
Ack a = db.generateAck(contactId, MAX_MESSAGE_IDS);
Ack a;
Transaction txn = db.startTransaction();
try {
a = db.generateAck(txn, contactId, MAX_MESSAGE_IDS);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if (a != null) writerTasks.add(new WriteAck(a));
@@ -212,8 +220,15 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
public void run() {
if (interrupted) return;
try {
Collection<byte[]> b = db.generateRequestedBatch(contactId,
MAX_PACKET_PAYLOAD_LENGTH, maxLatency);
Collection<byte[]> b;
Transaction txn = db.startTransaction();
try {
b = db.generateRequestedBatch(txn, contactId,
MAX_PACKET_PAYLOAD_LENGTH, maxLatency);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b != null) writerTasks.add(new WriteBatch(b));
@@ -247,8 +262,15 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
public void run() {
if (interrupted) return;
try {
Offer o = db.generateOffer(contactId, MAX_MESSAGE_IDS,
maxLatency);
Offer o;
Transaction txn = db.startTransaction();
try {
o = db.generateOffer(txn, contactId, MAX_MESSAGE_IDS,
maxLatency);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (LOG.isLoggable(INFO))
LOG.info("Generated offer: " + (o != null));
if (o != null) writerTasks.add(new WriteOffer(o));
@@ -282,7 +304,14 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
public void run() {
if (interrupted) return;
try {
Request r = db.generateRequest(contactId, MAX_MESSAGE_IDS);
Request r;
Transaction txn = db.startTransaction();
try {
r = db.generateRequest(txn, contactId, MAX_MESSAGE_IDS);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (LOG.isLoggable(INFO))
LOG.info("Generated request: " + (r != null));
if (r != null) writerTasks.add(new WriteRequest(r));

View File

@@ -5,6 +5,7 @@ import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
@@ -24,7 +25,9 @@ import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
/** An incoming {@link org.briarproject.api.sync.SyncSession SyncSession}. */
/**
* An incoming {@link org.briarproject.api.sync.SyncSession SyncSession}.
*/
class IncomingSession implements SyncSession, EventListener {
private static final Logger LOG =
@@ -103,7 +106,13 @@ class IncomingSession implements SyncSession, EventListener {
public void run() {
try {
db.receiveAck(contactId, ack);
Transaction txn = db.startTransaction();
try {
db.receiveAck(txn, contactId, ack);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
@@ -121,7 +130,13 @@ class IncomingSession implements SyncSession, EventListener {
public void run() {
try {
db.receiveMessage(contactId, message);
Transaction txn = db.startTransaction();
try {
db.receiveMessage(txn, contactId, message);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
@@ -139,7 +154,13 @@ class IncomingSession implements SyncSession, EventListener {
public void run() {
try {
db.receiveOffer(contactId, offer);
Transaction txn = db.startTransaction();
try {
db.receiveOffer(txn, contactId, offer);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
@@ -157,7 +178,13 @@ class IncomingSession implements SyncSession, EventListener {
public void run() {
try {
db.receiveRequest(contactId, request);
Transaction txn = db.startTransaction();
try {
db.receiveRequest(txn, contactId, request);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();

View File

@@ -4,6 +4,7 @@ import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
@@ -40,8 +41,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private static final ThrowingRunnable<IOException> CLOSE =
new ThrowingRunnable<IOException>() {
public void run() {}
};
public void run() {}
};
private final DatabaseComponent db;
private final Executor dbExecutor;
@@ -119,7 +120,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
public void run() {
if (interrupted) return;
try {
Ack a = db.generateAck(contactId, MAX_MESSAGE_IDS);
Ack a;
Transaction txn = db.startTransaction();
try {
a = db.generateAck(txn, contactId, MAX_MESSAGE_IDS);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if (a == null) decrementOutstandingQueries();
@@ -154,8 +162,15 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
public void run() {
if (interrupted) return;
try {
Collection<byte[]> b = db.generateBatch(contactId,
MAX_PACKET_PAYLOAD_LENGTH, maxLatency);
Collection<byte[]> b;
Transaction txn = db.startTransaction();
try {
b = db.generateBatch(txn, contactId,
MAX_PACKET_PAYLOAD_LENGTH, maxLatency);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b == null) decrementOutstandingQueries();

View File

@@ -3,10 +3,7 @@ package org.briarproject.sync;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorFactory;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.sync.GroupFactory;
import org.briarproject.api.sync.MessageFactory;
@@ -22,7 +19,6 @@ public class SyncModule extends AbstractModule {
@Override
protected void configure() {
bind(AuthorFactory.class).to(AuthorFactoryImpl.class);
bind(GroupFactory.class).to(GroupFactoryImpl.class);
bind(MessageFactory.class).to(MessageFactoryImpl.class);
bind(PacketReaderFactory.class).to(PacketReaderFactoryImpl.class);
@@ -32,11 +28,6 @@ public class SyncModule extends AbstractModule {
SyncSessionFactoryImpl.class).in(Singleton.class);
}
@Provides
ObjectReader<Author> getAuthorReader(AuthorFactory authorFactory) {
return new AuthorReader(authorFactory);
}
@Provides @Singleton
ValidationManager getValidationManager(LifecycleManager lifecycleManager,
EventBus eventBus, ValidationManagerImpl validationManager) {

View File

@@ -9,7 +9,7 @@ import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.NoSuchGroupException;
import org.briarproject.api.db.NoSuchMessageException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.MessageAddedEvent;
@@ -82,14 +82,17 @@ class ValidationManagerImpl implements ValidationManager, Service,
public void run() {
try {
// TODO: Don't do all of this in a single DB task
for (MessageId id : db.getMessagesToValidate(c)) {
try {
Message m = parseMessage(id, db.getRawMessage(id));
Group g = db.getGroup(m.getGroupId());
Transaction txn = db.startTransaction();
try {
for (MessageId id : db.getMessagesToValidate(txn, c)) {
byte[] raw = db.getRawMessage(txn, id);
Message m = parseMessage(id, raw);
Group g = db.getGroup(txn, m.getGroupId());
validateMessage(m, g);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before validation");
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
@@ -127,17 +130,21 @@ class ValidationManagerImpl implements ValidationManager, Service,
dbExecutor.execute(new Runnable() {
public void run() {
try {
if (meta == null) {
db.setMessageValid(m, c, false);
} else {
for (ValidationHook hook : hooks)
hook.validatingMessage(m, c, meta);
db.mergeMessageMetadata(m.getId(), meta);
db.setMessageValid(m, c, true);
db.setMessageShared(m, true);
Transaction txn = db.startTransaction();
try {
if (meta == null) {
db.setMessageValid(txn, m, c, false);
} else {
db.mergeMessageMetadata(txn, m.getId(), meta);
db.setMessageValid(txn, m, c, true);
db.setMessageShared(txn, m, true);
for (ValidationHook hook : hooks)
hook.validatingMessage(txn, m, c, meta);
}
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (NoSuchMessageException e) {
LOG.info("Message removed during validation");
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
@@ -159,7 +166,13 @@ class ValidationManagerImpl implements ValidationManager, Service,
dbExecutor.execute(new Runnable() {
public void run() {
try {
validateMessage(m, db.getGroup(m.getGroupId()));
Transaction txn = db.startTransaction();
try {
validateMessage(m, db.getGroup(txn, m.getGroupId()));
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (NoSuchGroupException e) {
LOG.info("Group removed before validation");
} catch (DbException e) {

View File

@@ -7,6 +7,7 @@ import org.briarproject.api.crypto.SecretKey;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.ContactRemovedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
@@ -55,7 +56,14 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
@Override
public boolean start() {
try {
Map<TransportId, Integer> latencies = db.getTransportLatencies();
Map<TransportId, Integer> latencies;
Transaction txn = db.startTransaction();
try {
latencies = db.getTransportLatencies(txn);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
for (Entry<TransportId, Integer> e : latencies.entrySet())
addTransport(e.getKey(), e.getValue());
} catch (DbException e) {

View File

@@ -7,6 +7,7 @@ import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.crypto.SecretKey;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.system.Clock;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.StreamContext;
@@ -66,7 +67,13 @@ class TransportKeyManager extends TimerTask {
// Load the transport keys from the DB
Map<ContactId, TransportKeys> loaded;
try {
loaded = db.getTransportKeys(transportId);
Transaction txn = db.startTransaction();
try {
loaded = db.getTransportKeys(txn, transportId);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return;
@@ -90,7 +97,13 @@ class TransportKeyManager extends TimerTask {
for (Entry<ContactId, TransportKeys> e : current.entrySet())
addKeys(e.getKey(), new MutableTransportKeys(e.getValue()));
// Write any rotated keys back to the DB
db.updateTransportKeys(rotated);
Transaction txn = db.startTransaction();
try {
db.updateTransportKeys(txn, rotated);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
@@ -135,7 +148,13 @@ class TransportKeyManager extends TimerTask {
// Initialise mutable state for the contact
addKeys(c, new MutableTransportKeys(k));
// Write the keys back to the DB
db.addTransportKeys(c, k);
Transaction txn = db.startTransaction();
try {
db.addTransportKeys(txn, c, k);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
@@ -171,8 +190,14 @@ class TransportKeyManager extends TimerTask {
outKeys.getStreamCounter());
// Increment the stream counter and write it back to the DB
outKeys.incrementStreamCounter();
db.incrementStreamCounter(c, transportId,
outKeys.getRotationPeriod());
Transaction txn = db.startTransaction();
try {
db.incrementStreamCounter(txn, c, transportId,
outKeys.getRotationPeriod());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return ctx;
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -210,9 +235,15 @@ class TransportKeyManager extends TimerTask {
inContexts.remove(new Bytes(removeTag));
}
// Write the window back to the DB
db.setReorderingWindow(tagCtx.contactId, transportId,
inKeys.getRotationPeriod(), window.getBase(),
window.getBitmap());
Transaction txn = db.startTransaction();
try {
db.setReorderingWindow(txn, tagCtx.contactId, transportId,
inKeys.getRotationPeriod(), window.getBase(),
window.getBitmap());
txn.setComplete();
} finally {
db.endTransaction(txn);
}
return ctx;
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -249,7 +280,13 @@ class TransportKeyManager extends TimerTask {
for (Entry<ContactId, TransportKeys> e : current.entrySet())
addKeys(e.getKey(), new MutableTransportKeys(e.getValue()));
// Write any rotated keys back to the DB
db.updateTransportKeys(rotated);
Transaction txn = db.startTransaction();
try {
db.updateTransportKeys(txn, rotated);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {

View File

@@ -0,0 +1,14 @@
package org.briarproject.contact;
import org.briarproject.BriarTestCase;
import org.junit.Test;
import static org.junit.Assert.fail;
public class ContactManagerImplTest extends BriarTestCase {
@Test
public void testUnitTestsExist() {
fail(); // FIXME: Write tests
}
}

View File

@@ -8,7 +8,6 @@ import org.briarproject.api.contact.ContactId;
import org.briarproject.api.crypto.SecretKey;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.db.StorageStatus;
import org.briarproject.api.identity.Author;
import org.briarproject.api.identity.AuthorId;
import org.briarproject.api.identity.LocalAuthor;
@@ -87,8 +86,7 @@ public class H2DatabaseTest extends BriarTestCase {
localAuthorId = new AuthorId(TestUtils.getRandomId());
timestamp = System.currentTimeMillis();
localAuthor = new LocalAuthor(localAuthorId, "Bob",
new byte[MAX_PUBLIC_KEY_LENGTH], new byte[123], timestamp,
StorageStatus.ACTIVE);
new byte[MAX_PUBLIC_KEY_LENGTH], new byte[123], timestamp);
messageId = new MessageId(TestUtils.getRandomId());
size = 1234;
raw = new byte[size];
@@ -1060,8 +1058,7 @@ public class H2DatabaseTest extends BriarTestCase {
throws Exception {
AuthorId localAuthorId1 = new AuthorId(TestUtils.getRandomId());
LocalAuthor localAuthor1 = new LocalAuthor(localAuthorId1, "Carol",
new byte[MAX_PUBLIC_KEY_LENGTH], new byte[123], timestamp,
StorageStatus.ACTIVE);
new byte[MAX_PUBLIC_KEY_LENGTH], new byte[123], timestamp);
Database<Connection> db = open(false);
Connection txn = db.startTransaction();

View File

@@ -0,0 +1,14 @@
package org.briarproject.identity;
import org.briarproject.BriarTestCase;
import org.junit.Test;
import static org.junit.Assert.fail;
public class IdentityManagerImplTest extends BriarTestCase {
@Test
public void testUnitTestsExist() {
fail(); // FIXME: Write tests
}
}

View File

@@ -3,6 +3,7 @@ package org.briarproject.plugins;
import org.briarproject.BriarTestCase;
import org.briarproject.api.TransportId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
@@ -14,6 +15,7 @@ import org.briarproject.api.plugins.simplex.SimplexPluginCallback;
import org.briarproject.api.plugins.simplex.SimplexPluginConfig;
import org.briarproject.api.plugins.simplex.SimplexPluginFactory;
import org.briarproject.api.properties.TransportPropertyManager;
import org.briarproject.api.settings.SettingsManager;
import org.briarproject.api.system.Clock;
import org.briarproject.api.ui.UiCallback;
import org.briarproject.system.SystemClock;
@@ -46,6 +48,8 @@ public class PluginManagerImplTest extends BriarTestCase {
final Poller poller = context.mock(Poller.class);
final ConnectionManager connectionManager =
context.mock(ConnectionManager.class);
final SettingsManager settingsManager =
context.mock(SettingsManager.class);
final TransportPropertyManager transportPropertyManager =
context.mock(TransportPropertyManager.class);
final UiCallback uiCallback = context.mock(UiCallback.class);
@@ -55,18 +59,21 @@ public class PluginManagerImplTest extends BriarTestCase {
final SimplexPlugin simplexPlugin = context.mock(SimplexPlugin.class);
final TransportId simplexId = new TransportId("simplex");
final int simplexLatency = 12345;
final Transaction simplexTxn = new Transaction(null);
final SimplexPluginFactory simplexFailFactory =
context.mock(SimplexPluginFactory.class, "simplexFailFactory");
final SimplexPlugin simplexFailPlugin =
context.mock(SimplexPlugin.class, "simplexFailPlugin");
final TransportId simplexFailId = new TransportId("simplex1");
final int simplexFailLatency = 23456;
final Transaction simplexFailTxn = new Transaction(null);
// Two duplex plugin factories: one creates a plugin, the other fails
final DuplexPluginFactory duplexFactory =
context.mock(DuplexPluginFactory.class);
final DuplexPlugin duplexPlugin = context.mock(DuplexPlugin.class);
final TransportId duplexId = new TransportId("duplex");
final int duplexLatency = 34567;
final Transaction duplexTxn = new Transaction(null);
final DuplexPluginFactory duplexFailFactory =
context.mock(DuplexPluginFactory.class, "duplexFailFactory");
final TransportId duplexFailId = new TransportId("duplex1");
@@ -82,7 +89,10 @@ public class PluginManagerImplTest extends BriarTestCase {
will(returnValue(simplexPlugin)); // Created
oneOf(simplexPlugin).getMaxLatency();
will(returnValue(simplexLatency));
oneOf(db).addTransport(simplexId, simplexLatency);
oneOf(db).startTransaction();
will(returnValue(simplexTxn));
oneOf(db).addTransport(simplexTxn, simplexId, simplexLatency);
oneOf(db).endTransaction(simplexTxn);
oneOf(simplexPlugin).start();
will(returnValue(true)); // Started
oneOf(simplexPlugin).shouldPoll();
@@ -96,7 +106,11 @@ public class PluginManagerImplTest extends BriarTestCase {
will(returnValue(simplexFailPlugin)); // Created
oneOf(simplexFailPlugin).getMaxLatency();
will(returnValue(simplexFailLatency));
oneOf(db).addTransport(simplexFailId, simplexFailLatency);
oneOf(db).startTransaction();
will(returnValue(simplexFailTxn));
oneOf(db).addTransport(simplexFailTxn, simplexFailId,
simplexFailLatency);
oneOf(db).endTransaction(simplexFailTxn);
oneOf(simplexFailPlugin).start();
will(returnValue(false)); // Failed to start
// First duplex plugin
@@ -109,7 +123,10 @@ public class PluginManagerImplTest extends BriarTestCase {
will(returnValue(duplexPlugin)); // Created
oneOf(duplexPlugin).getMaxLatency();
will(returnValue(duplexLatency));
oneOf(db).addTransport(duplexId, duplexLatency);
oneOf(db).startTransaction();
will(returnValue(duplexTxn));
oneOf(db).addTransport(duplexTxn, duplexId, duplexLatency);
oneOf(db).endTransaction(duplexTxn);
oneOf(duplexPlugin).start();
will(returnValue(true)); // Started
oneOf(duplexPlugin).shouldPoll();
@@ -128,7 +145,8 @@ public class PluginManagerImplTest extends BriarTestCase {
}});
PluginManagerImpl p = new PluginManagerImpl(ioExecutor, eventBus,
simplexPluginConfig, duplexPluginConfig, clock, db, poller,
connectionManager, transportPropertyManager, uiCallback);
connectionManager, settingsManager, transportPropertyManager,
uiCallback);
// Two plugins should be started and stopped
assertTrue(p.start());

View File

@@ -12,7 +12,7 @@ import org.briarproject.api.contact.ContactId;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.crypto.SecretKey;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.StorageStatus;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
@@ -120,11 +120,16 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
lifecycleManager.startServices();
lifecycleManager.waitForStartup();
// Add a transport
db.addTransport(transportId, MAX_LATENCY);
Transaction txn = db.startTransaction();
try {
db.addTransport(txn, transportId, MAX_LATENCY);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
// Add an identity for Alice
LocalAuthor aliceAuthor = new LocalAuthor(aliceId, "Alice",
new byte[MAX_PUBLIC_KEY_LENGTH], new byte[123], timestamp,
StorageStatus.ADDING);
new byte[MAX_PUBLIC_KEY_LENGTH], new byte[123], timestamp);
identityManager.addLocalAuthor(aliceAuthor);
// Add Bob as a contact
Author bobAuthor = new Author(bobId, "Bob",
@@ -185,11 +190,16 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
lifecycleManager.startServices();
lifecycleManager.waitForStartup();
// Add a transport
db.addTransport(transportId, MAX_LATENCY);
Transaction txn = db.startTransaction();
try {
db.addTransport(txn, transportId, MAX_LATENCY);
txn.setComplete();
} finally {
db.endTransaction(txn);
}
// Add an identity for Bob
LocalAuthor bobAuthor = new LocalAuthor(bobId, "Bob",
new byte[MAX_PUBLIC_KEY_LENGTH], new byte[123], timestamp,
StorageStatus.ADDING);
new byte[MAX_PUBLIC_KEY_LENGTH], new byte[123], timestamp);
identityManager.addLocalAuthor(bobAuthor);
// Add Alice as a contact
Author aliceAuthor = new Author(aliceId, "Alice",

View File

@@ -5,6 +5,7 @@ import org.briarproject.TestUtils;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.Transaction;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.sync.Ack;
import org.briarproject.api.sync.MessageId;
@@ -49,16 +50,24 @@ public class SimplexOutgoingSessionTest extends BriarTestCase {
final SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, maxLatency,
packetWriter);
final Transaction noAckTxn = new Transaction(null);
final Transaction noMsgTxn = new Transaction(null);
context.checking(new Expectations() {{
// Add listener
oneOf(eventBus).addListener(session);
// No acks to send
oneOf(db).generateAck(contactId, MAX_MESSAGE_IDS);
oneOf(db).startTransaction();
will(returnValue(noAckTxn));
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(null));
oneOf(db).endTransaction(noAckTxn);
// No messages to send
oneOf(db).generateBatch(with(contactId), with(any(int.class)),
with(maxLatency));
oneOf(db).startTransaction();
will(returnValue(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
with(any(int.class)), with(maxLatency));
will(returnValue(null));
oneOf(db).endTransaction(noMsgTxn);
// Flush the output stream
oneOf(packetWriter).flush();
// Remove listener
@@ -75,25 +84,41 @@ public class SimplexOutgoingSessionTest extends BriarTestCase {
final SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, maxLatency,
packetWriter);
final Transaction ackTxn = new Transaction(null);
final Transaction noAckTxn = new Transaction(null);
final Transaction msgTxn = new Transaction(null);
final Transaction noMsgTxn = new Transaction(null);
context.checking(new Expectations() {{
// Add listener
oneOf(eventBus).addListener(session);
// One ack to send
oneOf(db).generateAck(contactId, MAX_MESSAGE_IDS);
oneOf(db).startTransaction();
will(returnValue(ackTxn));
oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(ack));
oneOf(db).endTransaction(ackTxn);
oneOf(packetWriter).writeAck(ack);
// No more acks
oneOf(db).generateAck(contactId, MAX_MESSAGE_IDS);
will(returnValue(null));
// One message to send
oneOf(db).generateBatch(with(contactId), with(any(int.class)),
with(maxLatency));
oneOf(db).startTransaction();
will(returnValue(msgTxn));
oneOf(db).generateBatch(with(msgTxn), with(contactId),
with(any(int.class)), with(maxLatency));
will(returnValue(Arrays.asList(raw)));
oneOf(db).endTransaction(msgTxn);
oneOf(packetWriter).writeMessage(raw);
// No more messages
oneOf(db).generateBatch(with(contactId), with(any(int.class)),
with(maxLatency));
// No more acks
oneOf(db).startTransaction();
will(returnValue(noAckTxn));
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(null));
oneOf(db).endTransaction(noAckTxn);
// No more messages
oneOf(db).startTransaction();
will(returnValue(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
with(any(int.class)), with(maxLatency));
will(returnValue(null));
oneOf(db).endTransaction(noMsgTxn);
// Flush the output stream
oneOf(packetWriter).flush();
// Remove listener