Added new database events and simplified database locking.

This commit is contained in:
akwizgran
2013-01-25 16:27:50 +00:00
parent 64bf1fbbb1
commit 9b98d3c7d6
11 changed files with 467 additions and 436 deletions

View File

@@ -9,20 +9,20 @@ import net.sf.briar.api.ContactId;
* An event that is broadcast when the set of subscriptions visible to one or * An event that is broadcast when the set of subscriptions visible to one or
* more contacts is updated. * more contacts is updated.
*/ */
public class SubscriptionsUpdatedEvent extends DatabaseEvent { public class LocalSubscriptionsUpdatedEvent extends DatabaseEvent {
private final Collection<ContactId> affectedContacts; private final Collection<ContactId> affected;
public SubscriptionsUpdatedEvent() { public LocalSubscriptionsUpdatedEvent() {
affectedContacts = Collections.emptyList(); affected = Collections.emptyList();
} }
public SubscriptionsUpdatedEvent(Collection<ContactId> affectedContacts) { public LocalSubscriptionsUpdatedEvent(Collection<ContactId> affected) {
this.affectedContacts = affectedContacts; this.affected = affected;
} }
/** Returns the contacts affected by the update. */ /** Returns the contacts affected by the update. */
public Collection<ContactId> getAffectedContacts() { public Collection<ContactId> getAffectedContacts() {
return affectedContacts; return affected;
} }
} }

View File

@@ -4,6 +4,6 @@ package net.sf.briar.api.db.event;
* An event that is broadcast when the local transport properties are * An event that is broadcast when the local transport properties are
* updated. * updated.
*/ */
public class TransportsUpdatedEvent extends DatabaseEvent { public class LocalTransportsUpdatedEvent extends DatabaseEvent {
} }

View File

@@ -0,0 +1,17 @@
package net.sf.briar.api.db.event;
import net.sf.briar.api.ContactId;
/** An event that is broadcast when a contact's subscriptions are updated. */
public class RemoteSubscriptionsUpdatedEvent extends DatabaseEvent {
private final ContactId contactId;
public RemoteSubscriptionsUpdatedEvent(ContactId contactId) {
this.contactId = contactId;
}
public ContactId getContactId() {
return contactId;
}
}

View File

@@ -0,0 +1,28 @@
package net.sf.briar.api.db.event;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.protocol.TransportId;
/**
* An event that is broadcast when a contact's remote transport properties
* are updated.
*/
public class RemoteTransportsUpdatedEvent extends DatabaseEvent {
private final ContactId contactId;
private final TransportId transportId;
public RemoteTransportsUpdatedEvent(ContactId contactId,
TransportId transportId) {
this.contactId = contactId;
this.transportId = transportId;
}
public ContactId getContactId() {
return contactId;
}
public TransportId getTransportId() {
return transportId;
}
}

View File

@@ -0,0 +1,17 @@
package net.sf.briar.api.db.event;
import net.sf.briar.api.protocol.TransportId;
/** An event that is broadcast when a transport is added. */
public class TransportAddedEvent extends DatabaseEvent {
private final TransportId transportId;
public TransportAddedEvent(TransportId transportId) {
this.transportId = transportId;
}
public TransportId getTransportId() {
return transportId;
}
}

View File

@@ -0,0 +1,17 @@
package net.sf.briar.api.db.event;
import net.sf.briar.api.protocol.TransportId;
/** An event that is broadcast when a transport is removed. */
public class TransportRemovedEvent extends DatabaseEvent {
private final TransportId transportId;
public TransportRemovedEvent(TransportId transportId) {
this.transportId = transportId;
}
public TransportId getTransportId() {
return transportId;
}
}

View File

@@ -31,12 +31,10 @@ import net.sf.briar.api.transport.TemporarySecret;
* {@link #commitTransaction(T)}, even if an exception is thrown. * {@link #commitTransaction(T)}, even if an exception is thrown.
* <p> * <p>
* Locking is provided by the DatabaseComponent implementation. To prevent * Locking is provided by the DatabaseComponent implementation. To prevent
* deadlock, locks must be acquired in the following order: * deadlock, locks must be acquired in the following (alphabetical) order:
* <ul> * <ul>
* <li> contact * <li> contact
* <li> message * <li> message
* <li> messageFlag
* <li> messageStatus
* <li> rating * <li> rating
* <li> subscription * <li> subscription
* <li> transport * <li> transport
@@ -83,7 +81,7 @@ interface Database<T> {
/** /**
* Adds a contact transport to the database. * Adds a contact transport to the database.
* <p> * <p>
* Locking: contact read, window write. * Locking: contact read, transport read, window write.
*/ */
void addContactTransport(T txn, ContactTransport ct) throws DbException; void addContactTransport(T txn, ContactTransport ct) throws DbException;
@@ -98,14 +96,14 @@ interface Database<T> {
/** /**
* Records a received message as needing to be acknowledged. * Records a received message as needing to be acknowledged.
* <p> * <p>
* Locking: contact read, messageStatus write. * Locking: contact read, message write.
*/ */
void addMessageToAck(T txn, ContactId c, MessageId m) throws DbException; void addMessageToAck(T txn, ContactId c, MessageId m) throws DbException;
/** /**
* Records a collection of sent messages as needing to be acknowledged. * Records a collection of sent messages as needing to be acknowledged.
* <p> * <p>
* Locking: contact read, message read, messageStatus write. * Locking: contact read, message write.
*/ */
void addOutstandingMessages(T txn, ContactId c, Collection<MessageId> sent) void addOutstandingMessages(T txn, ContactId c, Collection<MessageId> sent)
throws DbException; throws DbException;
@@ -122,7 +120,7 @@ interface Database<T> {
* Stores the given temporary secrets and deletes any secrets that have * Stores the given temporary secrets and deletes any secrets that have
* been made obsolete. * been made obsolete.
* <p> * <p>
* Locking: contact read, window write. * Locking: contact read, transport read, window write.
*/ */
void addSecrets(T txn, Collection<TemporarySecret> secrets) void addSecrets(T txn, Collection<TemporarySecret> secrets)
throws DbException; throws DbException;
@@ -158,7 +156,7 @@ interface Database<T> {
/** /**
* Returns true if the database contains the given contact transport. * Returns true if the database contains the given contact transport.
* <p> * <p>
* Locking: contact read, window read. * Locking: contact read, transport read, window read.
*/ */
boolean containsContactTransport(T txn, ContactId c, TransportId t) boolean containsContactTransport(T txn, ContactId c, TransportId t)
throws DbException; throws DbException;
@@ -203,7 +201,7 @@ interface Database<T> {
/** /**
* Returns all contact transports. * Returns all contact transports.
* <p> * <p>
* Locking: contact read, window read. * Locking: contact read, transport read, window read.
*/ */
Collection<ContactTransport> getContactTransports(T txn) throws DbException; Collection<ContactTransport> getContactTransports(T txn) throws DbException;
@@ -257,7 +255,7 @@ interface Database<T> {
/** /**
* Returns the headers of all messages in the given group. * Returns the headers of all messages in the given group.
* <p> * <p>
* Locking: message read, messageFlag read. * Locking: message read.
*/ */
Collection<MessageHeader> getMessageHeaders(T txn, GroupId g) Collection<MessageHeader> getMessageHeaders(T txn, GroupId g)
throws DbException; throws DbException;
@@ -267,8 +265,7 @@ interface Database<T> {
* if the message is not present in the database or is not sendable to the * if the message is not present in the database or is not sendable to the
* given contact. * given contact.
* <p> * <p>
* Locking: contact read, message read, messageStatus read, * Locking: contact read, message read, subscription read.
* subscription read.
*/ */
byte[] getMessageIfSendable(T txn, ContactId c, MessageId m) byte[] getMessageIfSendable(T txn, ContactId c, MessageId m)
throws DbException; throws DbException;
@@ -285,7 +282,7 @@ interface Database<T> {
* Returns the IDs of some messages received from the given contact that * Returns the IDs of some messages received from the given contact that
* need to be acknowledged, up to the given number of messages. * need to be acknowledged, up to the given number of messages.
* <p> * <p>
* Locking: contact read, messageStatus read. * Locking: contact read, message read.
*/ */
Collection<MessageId> getMessagesToAck(T txn, ContactId c, int maxMessages) Collection<MessageId> getMessagesToAck(T txn, ContactId c, int maxMessages)
throws DbException; throws DbException;
@@ -294,8 +291,7 @@ interface Database<T> {
* Returns the IDs of some messages that are eligible to be sent to the * Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given number of messages. * given contact, up to the given number of messages.
* <p> * <p>
* Locking: contact read, message read, messageStatus read, * Locking: contact read, message read, subscription read.
* subscription read.
*/ */
Collection<MessageId> getMessagesToOffer(T txn, ContactId c, Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
int maxMessages) throws DbException; int maxMessages) throws DbException;
@@ -327,7 +323,7 @@ interface Database<T> {
/** /**
* Returns true if the given message has been read. * Returns true if the given message has been read.
* <p> * <p>
* Locking: message read, messageFlag read. * Locking: message read.
*/ */
boolean getReadFlag(T txn, MessageId m) throws DbException; boolean getReadFlag(T txn, MessageId m) throws DbException;
@@ -342,7 +338,7 @@ interface Database<T> {
/** /**
* Returns all temporary secrets. * Returns all temporary secrets.
* <p> * <p>
* Locking: contact read, window read. * Locking: contact read, transport read, window read.
*/ */
Collection<TemporarySecret> getSecrets(T txn) throws DbException; Collection<TemporarySecret> getSecrets(T txn) throws DbException;
@@ -358,8 +354,7 @@ interface Database<T> {
* given contact, with a total length less than or equal to the given * given contact, with a total length less than or equal to the given
* length. * length.
* <p> * <p>
* Locking: contact read, message read, messageStatus read, * Locking: contact read, message read, subscription read.
* subscription read.
*/ */
Collection<MessageId> getSendableMessages(T txn, ContactId c, int maxLength) Collection<MessageId> getSendableMessages(T txn, ContactId c, int maxLength)
throws DbException; throws DbException;
@@ -367,7 +362,7 @@ interface Database<T> {
/** /**
* Returns true if the given message has been starred. * Returns true if the given message has been starred.
* <p> * <p>
* Locking: message read, messageFlag read. * Locking: message read.
*/ */
boolean getStarredFlag(T txn, MessageId m) throws DbException; boolean getStarredFlag(T txn, MessageId m) throws DbException;
@@ -425,7 +420,7 @@ interface Database<T> {
/** /**
* Returns the number of unread messages in each subscribed group. * Returns the number of unread messages in each subscribed group.
* <p> * <p>
* Locking: message read, messageFlag read, subscription read. * Locking: message read, subscription read.
*/ */
Map<GroupId, Integer> getUnreadMessageCounts(T txn) throws DbException; Map<GroupId, Integer> getUnreadMessageCounts(T txn) throws DbException;
@@ -439,7 +434,7 @@ interface Database<T> {
/** /**
* Returns true if any messages are sendable to the given contact. * Returns true if any messages are sendable to the given contact.
* <p> * <p>
* Locking: contact read, message read, messageStatus read. * Locking: contact read, message read.
*/ */
boolean hasSendableMessages(T txn, ContactId c) throws DbException; boolean hasSendableMessages(T txn, ContactId c) throws DbException;
@@ -447,7 +442,7 @@ interface Database<T> {
* Increments the outgoing connection counter for the given contact * Increments the outgoing connection counter for the given contact
* transport in the given rotation period and returns the old value; * transport in the given rotation period and returns the old value;
* <p> * <p>
* Locking: contact read, window write. * Locking: contact read, transport read, window write.
*/ */
long incrementConnectionCounter(T txn, ContactId c, TransportId t, long incrementConnectionCounter(T txn, ContactId c, TransportId t,
long period) throws DbException; long period) throws DbException;
@@ -470,54 +465,52 @@ interface Database<T> {
void mergeLocalProperties(T txn, TransportId t, TransportProperties p) void mergeLocalProperties(T txn, TransportId t, TransportProperties p)
throws DbException; throws DbException;
/**
* Removes outstanding messages that have been acknowledged. Any of the
* messages that are still considered outstanding (Status.SENT) with
* respect to the given contact are now considered seen (Status.SEEN).
* <p>
* Locking: contact read, message read, messageStatus write.
*/
void removeOutstandingMessages(T txn, ContactId c,
Collection<MessageId> acked) throws DbException;
/**
* Marks the given messages received from the given contact as having been
* acknowledged.
* <p>
* Locking: contact read, messageStatus write.
*/
void removeMessagesToAck(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
/** /**
* Removes a contact (and all associated state) from the database. * Removes a contact (and all associated state) from the database.
* <p> * <p>
* Locking: contact write, message write, messageFlag write, * Locking: contact write, message write, subscription write,
* messageStatus write, subscription write, transport write, window write. * transport write, window write.
*/ */
void removeContact(T txn, ContactId c) throws DbException; void removeContact(T txn, ContactId c) throws DbException;
/** /**
* Removes a message (and all associated state) from the database. * Removes a message (and all associated state) from the database.
* <p> * <p>
* Locking: contact read, message write, messageFlag write, * Locking: contact read, message write.
* messageStatus write.
*/ */
void removeMessage(T txn, MessageId m) throws DbException; void removeMessage(T txn, MessageId m) throws DbException;
/**
* Marks the given messages received from the given contact as having been
* acknowledged.
* <p>
* Locking: contact read, message write.
*/
void removeMessagesToAck(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
/**
* Removes outstanding messages that have been acknowledged. Any of the
* messages that are still considered outstanding (Status.SENT) with
* respect to the given contact are now considered seen (Status.SEEN).
* <p>
* Locking: contact read, message write.
*/
void removeOutstandingMessages(T txn, ContactId c,
Collection<MessageId> acked) throws DbException;
/** /**
* Unsubscribes from the given group. Any messages belonging to the group * Unsubscribes from the given group. Any messages belonging to the group
* are deleted from the database. * are deleted from the database.
* <p> * <p>
* Locking: contact write, message write, messageFlag write, * Locking: contact write, message write, subscription write.
* messageStatus write, subscription write.
*/ */
void removeSubscription(T txn, GroupId g) throws DbException; void removeSubscription(T txn, GroupId g) throws DbException;
/** /**
* Removes a transport (and all associated state) from the database. * Removes a transport (and all associated state) from the database.
* <p> * <p>
* Locking: contact read, transport write. * Locking: transport write.
*/ */
void removeTransport(T txn, TransportId t) throws DbException; void removeTransport(T txn, TransportId t) throws DbException;
@@ -532,7 +525,7 @@ interface Database<T> {
* Sets the connection reordering window for the given contact transport in * Sets the connection reordering window for the given contact transport in
* the given rotation period. * the given rotation period.
* <p> * <p>
* Locking: contact read, window write. * Locking: contact read, transport read, window write.
*/ */
void setConnectionWindow(T txn, ContactId c, TransportId t, long period, void setConnectionWindow(T txn, ContactId c, TransportId t, long period,
long centre, byte[] bitmap) throws DbException; long centre, byte[] bitmap) throws DbException;
@@ -555,7 +548,7 @@ interface Database<T> {
* Marks the given message read or unread and returns true if it was * Marks the given message read or unread and returns true if it was
* previously read. * previously read.
* <p> * <p>
* Locking: message read, messageFlag write. * Locking: message write.
*/ */
boolean setRead(T txn, MessageId m, boolean read) throws DbException; boolean setRead(T txn, MessageId m, boolean read) throws DbException;
@@ -581,14 +574,14 @@ interface Database<T> {
* Marks the given message starred or unstarred and returns true if it was * Marks the given message starred or unstarred and returns true if it was
* previously starred. * previously starred.
* <p> * <p>
* Locking: message read, messageFlag write. * Locking: message write.
*/ */
boolean setStarred(T txn, MessageId m, boolean starred) throws DbException; boolean setStarred(T txn, MessageId m, boolean starred) throws DbException;
/** /**
* Sets the status of the given message with respect to the given contact. * Sets the status of the given message with respect to the given contact.
* <p> * <p>
* Locking: contact read, message read, messageStatus write. * Locking: contact read, message write.
*/ */
void setStatus(T txn, ContactId c, MessageId m, Status s) void setStatus(T txn, ContactId c, MessageId m, Status s)
throws DbException; throws DbException;
@@ -599,8 +592,7 @@ interface Database<T> {
* with respect to the contact to Status.SEEN and returns true; otherwise * with respect to the contact to Status.SEEN and returns true; otherwise
* returns false. * returns false.
* <p> * <p>
* Locking: contact read, message read, messageStatus write, * Locking: contact read, message write, subscription read.
* subscription read.
*/ */
boolean setStatusSeenIfVisible(T txn, ContactId c, MessageId m) boolean setStatusSeenIfVisible(T txn, ContactId c, MessageId m)
throws DbException; throws DbException;

File diff suppressed because it is too large Load Diff

View File

@@ -92,7 +92,7 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String INDEX_MESSAGES_BY_SENDABILITY = private static final String INDEX_MESSAGES_BY_SENDABILITY =
"CREATE INDEX messagesBySendability ON messages (sendability)"; "CREATE INDEX messagesBySendability ON messages (sendability)";
// Locking: contact read, messageStatus // Locking: contact read, message
private static final String CREATE_MESSAGES_TO_ACK = private static final String CREATE_MESSAGES_TO_ACK =
"CREATE TABLE messagesToAck" "CREATE TABLE messagesToAck"
+ " (messageId HASH NOT NULL," + " (messageId HASH NOT NULL,"
@@ -102,7 +102,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)" + " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)"; + " ON DELETE CASCADE)";
// Locking: contact read, message read, messageStatus // Locking: contact read, message
private static final String CREATE_STATUSES = private static final String CREATE_STATUSES =
"CREATE TABLE statuses" "CREATE TABLE statuses"
+ " (messageId HASH NOT NULL," + " (messageId HASH NOT NULL,"
@@ -122,7 +122,7 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String INDEX_STATUSES_BY_CONTACT = private static final String INDEX_STATUSES_BY_CONTACT =
"CREATE INDEX statusesByContact ON statuses (contactId)"; "CREATE INDEX statusesByContact ON statuses (contactId)";
// Locking: message read, messageFlag // Locking: message
private static final String CREATE_FLAGS = private static final String CREATE_FLAGS =
"CREATE TABLE flags" "CREATE TABLE flags"
+ " (messageId HASH NOT NULL," + " (messageId HASH NOT NULL,"
@@ -252,7 +252,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES contacts (contactId)" + " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)"; + " ON DELETE CASCADE)";
// Locking: contact read, window // Locking: contact read, transport read, window
private static final String CREATE_CONTACT_TRANSPORTS = private static final String CREATE_CONTACT_TRANSPORTS =
"CREATE TABLE contactTransports" "CREATE TABLE contactTransports"
+ " (contactId INT NOT NULL," + " (contactId INT NOT NULL,"
@@ -269,7 +269,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES transports (transportId)" + " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)"; + " ON DELETE CASCADE)";
// Locking: contact read, window // Locking: contact read, transport read, window
private static final String CREATE_SECRETS = private static final String CREATE_SECRETS =
"CREATE TABLE secrets" "CREATE TABLE secrets"
+ " (contactId INT NOT NULL," + " (contactId INT NOT NULL,"

View File

@@ -29,8 +29,8 @@ import net.sf.briar.api.db.event.DatabaseEvent;
import net.sf.briar.api.db.event.DatabaseListener; import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.db.event.MessageAddedEvent; import net.sf.briar.api.db.event.MessageAddedEvent;
import net.sf.briar.api.db.event.MessageReceivedEvent; import net.sf.briar.api.db.event.MessageReceivedEvent;
import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent; import net.sf.briar.api.db.event.LocalSubscriptionsUpdatedEvent;
import net.sf.briar.api.db.event.TransportsUpdatedEvent; import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent;
import net.sf.briar.api.plugins.duplex.DuplexTransportConnection; import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.Message;
@@ -127,13 +127,13 @@ abstract class DuplexConnection implements DatabaseListener {
} else if(e instanceof MessageAddedEvent) { } else if(e instanceof MessageAddedEvent) {
if(canSendOffer.getAndSet(false)) if(canSendOffer.getAndSet(false))
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
} else if(e instanceof SubscriptionsUpdatedEvent) { } else if(e instanceof LocalSubscriptionsUpdatedEvent) {
Collection<ContactId> affected = Collection<ContactId> affected =
((SubscriptionsUpdatedEvent) e).getAffectedContacts(); ((LocalSubscriptionsUpdatedEvent) e).getAffectedContacts();
if(affected.contains(contactId)) { if(affected.contains(contactId)) {
dbExecutor.execute(new GenerateSubscriptionUpdate()); dbExecutor.execute(new GenerateSubscriptionUpdate());
} }
} else if(e instanceof TransportsUpdatedEvent) { } else if(e instanceof LocalTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportUpdate()); dbExecutor.execute(new GenerateTransportUpdate());
} }
} }

View File

@@ -19,7 +19,7 @@ import net.sf.briar.api.db.event.ContactRemovedEvent;
import net.sf.briar.api.db.event.DatabaseListener; import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.db.event.MessageAddedEvent; import net.sf.briar.api.db.event.MessageAddedEvent;
import net.sf.briar.api.db.event.RatingChangedEvent; import net.sf.briar.api.db.event.RatingChangedEvent;
import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent; import net.sf.briar.api.db.event.LocalSubscriptionsUpdatedEvent;
import net.sf.briar.api.lifecycle.ShutdownManager; import net.sf.briar.api.lifecycle.ShutdownManager;
import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.AuthorId;
@@ -1377,7 +1377,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
oneOf(database).removeVisibility(txn, contactId1, groupId); oneOf(database).removeVisibility(txn, contactId1, groupId);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
oneOf(listener).eventOccurred(with(any( oneOf(listener).eventOccurred(with(any(
SubscriptionsUpdatedEvent.class))); LocalSubscriptionsUpdatedEvent.class)));
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner, DatabaseComponent db = createDatabaseComponent(database, cleaner,
shutdown); shutdown);