Basic database support for private messages.

This commit is contained in:
akwizgran
2011-09-13 14:43:48 +01:00
parent 2858c139fa
commit de5caca578
6 changed files with 113 additions and 44 deletions

View File

@@ -63,8 +63,11 @@ public interface DatabaseComponent {
ContactId addContact(Map<String, Map<String, String>> transports,
byte[] secret) throws DbException;
/** Adds a locally generated message to the database. */
void addLocallyGeneratedMessage(Message m) throws DbException;
/** Adds a locally generated group message to the database. */
void addLocalGroupMessage(Message m) throws DbException;
/** Adds a locally generated private message to the database. */
void addLocalPrivateMessage(Message m, ContactId c) throws DbException;
/**
* Finds any lost batches that were sent to the given contact, and marks any

View File

@@ -1,6 +1,7 @@
package net.sf.briar.db;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
@@ -9,8 +10,8 @@ import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.DatabaseListener;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.Status;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Message;
@@ -175,8 +176,9 @@ DatabaseCleaner.Callback {
* <p>
* Locking: contacts read, messages write, messageStatuses write.
*/
protected boolean storeMessage(Txn txn, Message m, ContactId sender)
protected boolean storeGroupMessage(Txn txn, Message m, ContactId sender)
throws DbException {
if(m.getGroup() == null) throw new IllegalArgumentException();
boolean added = db.addMessage(txn, m);
// Mark the message as seen by the sender
MessageId id = m.getId();
@@ -198,6 +200,43 @@ DatabaseCleaner.Callback {
return added;
}
protected boolean storeMessages(Txn txn, ContactId c,
Collection<Message> messages) throws DbException {
boolean anyAdded = false;
for(Message m : messages) {
if(m.getGroup() == null) {
if(storePrivateMessage(txn, m, c, true)) anyAdded = true;
} else if(db.containsVisibleSubscription(txn, m.getGroup(), c,
m.getTimestamp())) {
if(storeGroupMessage(txn, m, c)) anyAdded = true;
}
}
return anyAdded;
}
/**
* If the given message is already in the database, returns false.
* Otherwise stores the message and marks it as new or seen with respect to
* the given contact, depending on whether the message is outgoing or
* incoming, respectively.
* <p>
* Locking: contacts read, messages write, messageStatuses write.
*/
protected boolean storePrivateMessage(Txn txn, Message m, ContactId c,
boolean incoming) throws DbException {
if(m.getGroup() != null) throw new IllegalArgumentException();
boolean added = db.addMessage(txn, m);
if(!added) return false;
MessageId id = m.getId();
if(incoming) db.setStatus(txn, c, id, Status.SEEN);
else db.setStatus(txn, c, id, Status.NEW);
// Count the bytes stored
synchronized(spaceLock) {
bytesStoredSinceLastCheck += m.getSize();
}
return true;
}
/**
* Iteratively updates the sendability of a message's ancestors to reflect
* a change in the message's sendability. Returns the number of ancestors

View File

@@ -132,7 +132,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
return c;
}
public void addLocallyGeneratedMessage(Message m) throws DbException {
public void addLocalGroupMessage(Message m) throws DbException {
boolean added = false;
waitForPermissionToWrite();
contactLock.readLock().lock();
@@ -150,7 +150,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
// predates the subscription
if(db.containsSubscription(txn, m.getGroup(),
m.getTimestamp())) {
added = storeMessage(txn, m, null);
added = storeGroupMessage(txn, m, null);
}
db.commitTransaction(txn);
} catch(DbException e) {
@@ -173,6 +173,38 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
if(added) callListeners(Event.MESSAGES_ADDED);
}
public void addLocalPrivateMessage(Message m, ContactId c)
throws DbException {
boolean added = false;
waitForPermissionToWrite();
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
added = storePrivateMessage(txn, m, c, false);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
if(added) callListeners(Event.MESSAGES_ADDED);
}
public void findLostBatches(ContactId c) throws DbException {
// Find any lost batches that need to be retransmitted
Collection<BatchId> lost;
@@ -751,20 +783,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
try {
Txn txn = db.startTransaction();
try {
int received = 0, stored = 0;
for(Message m : b.getMessages()) {
received++;
if(db.containsVisibleSubscription(txn,
m.getGroup(), c, m.getTimestamp())) {
if(storeMessage(txn, m, c)) {
anyAdded = true;
stored++;
}
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + received
+ " messages, stored " + stored);
anyAdded = storeMessages(txn, c, b.getMessages());
db.addBatchToAck(txn, c, b.getId());
db.commitTransaction(txn);
} catch(DbException e) {

View File

@@ -109,7 +109,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
return c;
}
public void addLocallyGeneratedMessage(Message m) throws DbException {
public void addLocalGroupMessage(Message m) throws DbException {
boolean added = false;
waitForPermissionToWrite();
synchronized(contactLock) {
@@ -123,7 +123,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
// predates the subscription
if(db.containsSubscription(txn, m.getGroup(),
m.getTimestamp())) {
added = storeMessage(txn, m, null);
added = storeGroupMessage(txn, m, null);
if(!added) {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Duplicate local message");
@@ -145,6 +145,28 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
if(added) callListeners(Event.MESSAGES_ADDED);
}
public void addLocalPrivateMessage(Message m, ContactId c)
throws DbException {
boolean added = false;
waitForPermissionToWrite();
synchronized(contactLock) {
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
added = storePrivateMessage(txn, m, c, false);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
// Call the listeners outside the lock
if(added) callListeners(Event.MESSAGES_ADDED);
}
public void findLostBatches(ContactId c) throws DbException {
// Find any lost batches that need to be retransmitted
Collection<BatchId> lost;
@@ -574,21 +596,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
int received = 0, stored = 0;
for(Message m : b.getMessages()) {
received++;
GroupId g = m.getGroup();
if(db.containsVisibleSubscription(txn, g, c,
m.getTimestamp())) {
if(storeMessage(txn, m, c)) {
anyAdded = true;
stored++;
}
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + received
+ " messages, stored " + stored);
anyAdded = storeMessages(txn, c, b.getMessages());
db.addBatchToAck(txn, c, b.getId());
db.commitTransaction(txn);
} catch(DbException e) {

View File

@@ -123,8 +123,8 @@ class MessageReader implements ObjectReader<Message> {
messageDigest.reset();
messageDigest.update(raw);
MessageId id = new MessageId(messageDigest.digest());
AuthorId authorId = author == null ? null : author.getId();
GroupId groupId = group == null ? null : group.getId();
AuthorId authorId = author == null ? null : author.getId();
return new MessageImpl(id, parent, groupId, authorId, timestamp, raw);
}
}

View File

@@ -376,7 +376,7 @@ public abstract class DatabaseComponentTest extends TestCase {
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.addLocallyGeneratedMessage(message);
db.addLocalGroupMessage(message);
context.assertIsSatisfied();
}
@@ -399,7 +399,7 @@ public abstract class DatabaseComponentTest extends TestCase {
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.addLocallyGeneratedMessage(message);
db.addLocalGroupMessage(message);
context.assertIsSatisfied();
}
@@ -431,7 +431,7 @@ public abstract class DatabaseComponentTest extends TestCase {
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.addLocallyGeneratedMessage(message);
db.addLocalGroupMessage(message);
context.assertIsSatisfied();
}
@@ -467,7 +467,7 @@ public abstract class DatabaseComponentTest extends TestCase {
}});
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.addLocallyGeneratedMessage(message);
db.addLocalGroupMessage(message);
context.assertIsSatisfied();
}
@@ -1132,7 +1132,7 @@ public abstract class DatabaseComponentTest extends TestCase {
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.addListener(listener);
db.addLocallyGeneratedMessage(message);
db.addLocalGroupMessage(message);
context.assertIsSatisfied();
}
@@ -1158,7 +1158,7 @@ public abstract class DatabaseComponentTest extends TestCase {
DatabaseComponent db = createDatabaseComponent(database, cleaner);
db.addListener(listener);
db.addLocallyGeneratedMessage(message);
db.addLocalGroupMessage(message);
context.assertIsSatisfied();
}