Moved subscription updates to the client layer.

This commit is contained in:
akwizgran
2016-02-01 15:13:36 +00:00
parent 54272c8836
commit 18db17bf5b
30 changed files with 1286 additions and 646 deletions

View File

@@ -598,13 +598,6 @@ interface Database<T> {
void setReorderingWindow(T txn, ContactId c, TransportId t,
long rotationPeriod, long base, byte[] bitmap) throws DbException;
/**
* Makes a group visible or invisible to future contacts by default.
* <p>
* Locking: write.
*/
void setVisibleToAll(T txn, GroupId g, boolean all) throws DbException;
/**
* Updates the transmission count and expiry time of the given message
* with respect to the given contact, using the latency of the transport

View File

@@ -223,6 +223,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (added) {
eventBus.broadcast(new MessageAddedEvent(m, null));
eventBus.broadcast(new MessageValidatedEvent(m, c, true, true));
if (shared) eventBus.broadcast(new MessageSharedEvent(m));
}
}
@@ -801,6 +802,28 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
public boolean isVisibleToContact(ContactId c, GroupId g)
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
if (!db.containsGroup(txn, g))
throw new NoSuchGroupException();
boolean visible = db.containsVisibleGroup(txn, c, g);
db.commitTransaction(txn);
return visible;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public void mergeGroupMetadata(GroupId g, Metadata meta)
throws DbException {
lock.writeLock().lock();
@@ -900,7 +923,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
duplicate = db.containsMessage(txn, m.getId());
visible = db.containsVisibleGroup(txn, c, m.getGroupId());
if (visible) {
if (!duplicate) addMessage(txn, m, UNKNOWN, true, c);
if (!duplicate) addMessage(txn, m, UNKNOWN, false, c);
db.raiseAckFlag(txn, c, m.getId());
}
db.commitTransaction(txn);
@@ -1162,7 +1185,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsGroup(txn, g))
throw new NoSuchGroupException();
// Use HashSets for O(1) lookups, O(n) overall running time
HashSet<ContactId> now = new HashSet<ContactId>(visible);
Collection<ContactId> now = new HashSet<ContactId>(visible);
Collection<ContactId> before = db.getVisibility(txn, g);
before = new HashSet<ContactId>(before);
// Set the group's visibility for each current contact
@@ -1177,8 +1200,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
affected.add(c);
}
}
// Make the group invisible to future contacts
db.setVisibleToAll(txn, g, false);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
@@ -1191,27 +1212,20 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
eventBus.broadcast(new GroupVisibilityUpdatedEvent(affected));
}
public void setVisibleToAll(GroupId g, boolean all) throws DbException {
Collection<ContactId> affected = new ArrayList<ContactId>();
public void setVisibleToContact(ContactId c, GroupId g, boolean visible)
throws DbException {
boolean wasVisible = false;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
if (!db.containsGroup(txn, g))
throw new NoSuchGroupException();
// Make the group visible or invisible to future contacts
db.setVisibleToAll(txn, g, all);
if (all) {
// Make the group visible to all current contacts
Collection<ContactId> before = db.getVisibility(txn, g);
before = new HashSet<ContactId>(before);
for (ContactId c : db.getContactIds(txn)) {
if (!before.contains(c)) {
db.addVisibility(txn, c, g);
affected.add(c);
}
}
}
wasVisible = db.containsVisibleGroup(txn, c, g);
if (visible && !wasVisible) db.addVisibility(txn, c, g);
else if (!visible && wasVisible) db.removeVisibility(txn, c, g);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
@@ -1220,8 +1234,10 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
} finally {
lock.writeLock().unlock();
}
if (!affected.isEmpty())
eventBus.broadcast(new GroupVisibilityUpdatedEvent(affected));
if (visible != wasVisible) {
eventBus.broadcast(new GroupVisibilityUpdatedEvent(
Collections.singletonList(c)));
}
}
public void updateTransportKeys(Map<ContactId, TransportKeys> keys)

View File

@@ -66,8 +66,8 @@ import static org.briarproject.db.ExponentialBackoff.calculateExpiry;
*/
abstract class JdbcDatabase implements Database<Connection> {
private static final int SCHEMA_VERSION = 19;
private static final int MIN_SCHEMA_VERSION = 19;
private static final int SCHEMA_VERSION = 20;
private static final int MIN_SCHEMA_VERSION = 20;
private static final String CREATE_SETTINGS =
"CREATE TABLE settings"
@@ -104,7 +104,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " (groupId HASH NOT NULL,"
+ " clientId HASH NOT NULL,"
+ " descriptor BINARY NOT NULL,"
+ " visibleToAll BOOLEAN NOT NULL,"
+ " PRIMARY KEY (groupId))";
private static final String CREATE_GROUP_METADATA =
@@ -511,30 +510,6 @@ abstract class JdbcDatabase implements Database<Connection> {
if (rows != 1) throw new DbStateException();
ps.close();
}
// Make groups that are visible to everyone visible to this contact
sql = "SELECT groupId FROM groups WHERE visibleToAll = TRUE";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
ids = new ArrayList<byte[]>();
while (rs.next()) ids.add(rs.getBytes(1));
rs.close();
ps.close();
if (!ids.isEmpty()) {
sql = "INSERT INTO groupVisibilities (contactId, groupId)"
+ " VALUES (?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
for (byte[] id : ids) {
ps.setBytes(2, id);
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != ids.size())
throw new DbStateException();
for (int rows : batchAffected)
if (rows != 1) throw new DbStateException();
ps.close();
}
return c;
} catch (SQLException e) {
tryToClose(rs);
@@ -546,9 +521,8 @@ abstract class JdbcDatabase implements Database<Connection> {
public void addGroup(Connection txn, Group g) throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO groups"
+ " (groupId, clientId, descriptor, visibleToAll)"
+ " VALUES (?, ?, ?, FALSE)";
String sql = "INSERT INTO groups (groupId, clientId, descriptor)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, g.getId().getBytes());
ps.setBytes(2, g.getClientId().getBytes());
@@ -2137,23 +2111,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setVisibleToAll(Connection txn, GroupId g, boolean all)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE groups SET visibleToAll = ? WHERE groupId = ?";
ps = txn.prepareStatement(sql);
ps.setBoolean(1, all);
ps.setBytes(2, g.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void updateExpiryTime(Connection txn, ContactId c, MessageId m,
int maxLatency) throws DbException {
PreparedStatement ps = null;

View File

@@ -0,0 +1,69 @@
package org.briarproject.forum;
import org.briarproject.api.FormatException;
import org.briarproject.api.data.BdfDictionary;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.MetadataEncoder;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageValidator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.logging.Logger;
import static org.briarproject.api.forum.ForumConstants.FORUM_SALT_LENGTH;
import static org.briarproject.api.forum.ForumConstants.MAX_FORUM_NAME_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
class ForumListValidator implements MessageValidator {
private static final Logger LOG =
Logger.getLogger(ForumListValidator.class.getName());
private final BdfReaderFactory bdfReaderFactory;
private final MetadataEncoder metadataEncoder;
ForumListValidator(BdfReaderFactory bdfReaderFactory,
MetadataEncoder metadataEncoder) {
this.bdfReaderFactory = bdfReaderFactory;
this.metadataEncoder = metadataEncoder;
}
@Override
public Metadata validateMessage(Message m, Group g) {
try {
// Parse the message body
byte[] raw = m.getRaw();
ByteArrayInputStream in = new ByteArrayInputStream(raw,
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
r.readListStart();
long version = r.readInteger();
if (version < 0) throw new FormatException();
r.readListStart();
while (!r.hasListEnd()) {
r.readListStart();
String name = r.readString(MAX_FORUM_NAME_LENGTH);
if (name.length() == 0) throw new FormatException();
byte[] salt = r.readRaw(FORUM_SALT_LENGTH);
if (salt.length != FORUM_SALT_LENGTH)
throw new FormatException();
r.readListEnd();
}
r.readListEnd();
r.readListEnd();
if (!r.eof()) throw new FormatException();
// Return the metadata
BdfDictionary d = new BdfDictionary();
d.put("version", version);
d.put("local", false);
return metadataEncoder.encode(d);
} catch (IOException e) {
LOG.info("Invalid forum list");
return null;
}
}
}

View File

@@ -4,13 +4,10 @@ import com.google.inject.Inject;
import org.briarproject.api.FormatException;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.data.BdfDictionary;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.BdfWriter;
import org.briarproject.api.data.BdfWriterFactory;
import org.briarproject.api.data.MetadataEncoder;
import org.briarproject.api.data.MetadataParser;
import org.briarproject.api.db.DatabaseComponent;
@@ -25,15 +22,12 @@ import org.briarproject.api.identity.AuthorId;
import org.briarproject.api.identity.LocalAuthor;
import org.briarproject.api.sync.ClientId;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupFactory;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.MessageId;
import org.briarproject.util.StringUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -42,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
@@ -63,25 +58,23 @@ class ForumManagerImpl implements ForumManager {
Logger.getLogger(ForumManagerImpl.class.getName());
private final DatabaseComponent db;
private final GroupFactory groupFactory;
private final ContactManager contactManager;
private final BdfReaderFactory bdfReaderFactory;
private final BdfWriterFactory bdfWriterFactory;
private final MetadataEncoder metadataEncoder;
private final MetadataParser metadataParser;
private final SecureRandom random;
/** Ensures isolation between database reads and writes. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Inject
ForumManagerImpl(CryptoComponent crypto, DatabaseComponent db,
GroupFactory groupFactory, BdfReaderFactory bdfReaderFactory,
BdfWriterFactory bdfWriterFactory, MetadataEncoder metadataEncoder,
ForumManagerImpl(DatabaseComponent db, ContactManager contactManager,
BdfReaderFactory bdfReaderFactory, MetadataEncoder metadataEncoder,
MetadataParser metadataParser) {
this.db = db;
this.groupFactory = groupFactory;
this.contactManager = contactManager;
this.bdfReaderFactory = bdfReaderFactory;
this.bdfWriterFactory = bdfWriterFactory;
this.metadataEncoder = metadataEncoder;
this.metadataParser = metadataParser;
random = crypto.getSecureRandom();
}
@Override
@@ -89,62 +82,154 @@ class ForumManagerImpl implements ForumManager {
return CLIENT_ID;
}
@Override
public Forum createForum(String name) {
int length = StringUtils.toUtf8(name).length;
if (length == 0) throw new IllegalArgumentException();
if (length > MAX_FORUM_NAME_LENGTH)
throw new IllegalArgumentException();
byte[] salt = new byte[FORUM_SALT_LENGTH];
random.nextBytes(salt);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BdfWriter w = bdfWriterFactory.createWriter(out);
try {
w.writeListStart();
w.writeString(name);
w.writeRaw(salt);
w.writeListEnd();
} catch (IOException e) {
// Shouldn't happen with ByteArrayOutputStream
throw new RuntimeException(e);
}
Group g = groupFactory.createGroup(CLIENT_ID, out.toByteArray());
return new Forum(g, name);
}
@Override
public void addForum(Forum f) throws DbException {
db.addGroup(f.getGroup());
}
@Override
public void addLocalPost(ForumPost p) throws DbException {
BdfDictionary d = new BdfDictionary();
d.put("timestamp", p.getMessage().getTimestamp());
if (p.getParent() != null) d.put("parent", p.getParent().getBytes());
if (p.getAuthor() != null) {
Author a = p.getAuthor();
BdfDictionary d1 = new BdfDictionary();
d1.put("id", a.getId().getBytes());
d1.put("name", a.getName());
d1.put("publicKey", a.getPublicKey());
d.put("author", d1);
}
d.put("contentType", p.getContentType());
d.put("local", true);
d.put("read", true);
lock.writeLock().lock();
try {
BdfDictionary d = new BdfDictionary();
d.put("timestamp", p.getMessage().getTimestamp());
if (p.getParent() != null)
d.put("parent", p.getParent().getBytes());
if (p.getAuthor() != null) {
Author a = p.getAuthor();
BdfDictionary d1 = new BdfDictionary();
d1.put("id", a.getId().getBytes());
d1.put("name", a.getName());
d1.put("publicKey", a.getPublicKey());
d.put("author", d1);
}
d.put("contentType", p.getContentType());
d.put("local", true);
d.put("read", true);
Metadata meta = metadataEncoder.encode(d);
db.addLocalMessage(p.getMessage(), CLIENT_ID, meta, true);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
throw new RuntimeException(e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Collection<Forum> getAvailableForums() throws DbException {
// TODO
return Collections.emptyList();
public Forum getForum(GroupId g) throws DbException {
lock.readLock().lock();
try {
return parseForum(db.getGroup(g));
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Collection<Forum> getForums() throws DbException {
lock.readLock().lock();
try {
List<Forum> forums = new ArrayList<Forum>();
for (Group g : db.getGroups(CLIENT_ID)) forums.add(parseForum(g));
return Collections.unmodifiableList(forums);
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public byte[] getPostBody(MessageId m) throws DbException {
lock.readLock().lock();
try {
byte[] raw = db.getRawMessage(m);
ByteArrayInputStream in = new ByteArrayInputStream(raw,
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
r.readListStart();
if (r.hasRaw()) r.skipRaw(); // Parent ID
else r.skipNull(); // No parent
if (r.hasList()) r.skipList(); // Author
else r.skipNull(); // No author
r.skipString(); // Content type
byte[] postBody = r.readRaw(MAX_FORUM_POST_BODY_LENGTH);
if (r.hasRaw()) r.skipRaw(); // Signature
else r.skipNull();
r.readListEnd();
if (!r.eof()) throw new FormatException();
return postBody;
} catch (FormatException e) {
throw new DbException(e);
} catch (IOException e) {
// Shouldn't happen with ByteArrayInputStream
throw new RuntimeException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Collection<ForumPostHeader> getPostHeaders(GroupId g)
throws DbException {
lock.readLock().lock();
try {
// Load the IDs of the user's identities
Set<AuthorId> localAuthorIds = new HashSet<AuthorId>();
for (LocalAuthor a : db.getLocalAuthors())
localAuthorIds.add(a.getId());
// Load the IDs of contacts' identities
Set<AuthorId> contactAuthorIds = new HashSet<AuthorId>();
for (Contact c : contactManager.getContacts())
contactAuthorIds.add(c.getAuthor().getId());
// Load and parse the metadata
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
Collection<ForumPostHeader> headers =
new ArrayList<ForumPostHeader>();
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
MessageId messageId = e.getKey();
Metadata meta = e.getValue();
try {
BdfDictionary d = metadataParser.parse(meta);
long timestamp = d.getInteger("timestamp");
Author author = null;
Author.Status authorStatus = ANONYMOUS;
BdfDictionary d1 = d.getDictionary("author", null);
if (d1 != null) {
AuthorId authorId = new AuthorId(d1.getRaw("id"));
String name = d1.getString("name");
byte[] publicKey = d1.getRaw("publicKey");
author = new Author(authorId, name, publicKey);
if (localAuthorIds.contains(authorId))
authorStatus = VERIFIED;
else if (contactAuthorIds.contains(authorId))
authorStatus = VERIFIED;
else authorStatus = UNKNOWN;
}
String contentType = d.getString("contentType");
boolean read = d.getBoolean("read");
headers.add(new ForumPostHeader(messageId, timestamp,
author, authorStatus, contentType, read));
} catch (FormatException ex) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, ex.toString(), ex);
}
}
return headers;
} finally {
lock.readLock().unlock();
}
}
@Override
public void setReadFlag(MessageId m, boolean read) throws DbException {
lock.writeLock().lock();
try {
BdfDictionary d = new BdfDictionary();
d.put("read", read);
db.mergeMessageMetadata(m, metadataEncoder.encode(d));
} catch (FormatException e) {
throw new RuntimeException(e);
} finally {
lock.writeLock().unlock();
}
}
private Forum parseForum(Group g) throws FormatException {
@@ -153,12 +238,10 @@ class ForumManagerImpl implements ForumManager {
try {
r.readListStart();
String name = r.readString(MAX_FORUM_NAME_LENGTH);
if (name.length() == 0) throw new FormatException();
byte[] salt = r.readRaw(FORUM_SALT_LENGTH);
if (salt.length != FORUM_SALT_LENGTH) throw new FormatException();
r.readListEnd();
if (!r.eof()) throw new FormatException();
return new Forum(g, name);
return new Forum(g, name, salt);
} catch (FormatException e) {
throw e;
} catch (IOException e) {
@@ -166,137 +249,4 @@ class ForumManagerImpl implements ForumManager {
throw new RuntimeException(e);
}
}
@Override
public Forum getForum(GroupId g) throws DbException {
Group group = db.getGroup(g);
if (!group.getClientId().equals(CLIENT_ID))
throw new IllegalArgumentException();
try {
return parseForum(group);
} catch (FormatException e) {
throw new IllegalArgumentException();
}
}
@Override
public Collection<Forum> getForums() throws DbException {
Collection<Group> groups = db.getGroups(CLIENT_ID);
List<Forum> forums = new ArrayList<Forum>(groups.size());
for (Group g : groups) {
try {
forums.add(parseForum(g));
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
return Collections.unmodifiableList(forums);
}
@Override
public byte[] getPostBody(MessageId m) throws DbException {
byte[] raw = db.getRawMessage(m);
ByteArrayInputStream in = new ByteArrayInputStream(raw,
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
try {
// Extract the forum post body
r.readListStart();
if (r.hasRaw()) r.skipRaw(); // Parent ID
else r.skipNull(); // No parent
if (r.hasList()) r.skipList(); // Author
else r.skipNull(); // No author
r.skipString(); // Content type
return r.readRaw(MAX_FORUM_POST_BODY_LENGTH);
} catch (FormatException e) {
// Not a valid forum post
throw new IllegalArgumentException();
} catch (IOException e) {
// Shouldn't happen with ByteArrayInputStream
throw new RuntimeException(e);
}
}
@Override
public Collection<ForumPostHeader> getPostHeaders(GroupId g)
throws DbException {
// Load the IDs of the user's own identities and contacts' identities
Set<AuthorId> localAuthorIds = new HashSet<AuthorId>();
for (LocalAuthor a : db.getLocalAuthors())
localAuthorIds.add(a.getId());
Set<AuthorId> contactAuthorIds = new HashSet<AuthorId>();
for (Contact c : db.getContacts())
contactAuthorIds.add(c.getAuthor().getId());
// Load and parse the metadata
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
Collection<ForumPostHeader> headers = new ArrayList<ForumPostHeader>();
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
MessageId messageId = e.getKey();
Metadata meta = e.getValue();
try {
BdfDictionary d = metadataParser.parse(meta);
long timestamp = d.getInteger("timestamp");
Author author = null;
Author.Status authorStatus = ANONYMOUS;
BdfDictionary d1 = d.getDictionary("author", null);
if (d1 != null) {
AuthorId authorId = new AuthorId(d1.getRaw("id"));
String name = d1.getString("name");
byte[] publicKey = d1.getRaw("publicKey");
author = new Author(authorId, name, publicKey);
if (localAuthorIds.contains(authorId))
authorStatus = VERIFIED;
else if (contactAuthorIds.contains(authorId))
authorStatus = VERIFIED;
else authorStatus = UNKNOWN;
}
String contentType = d.getString("contentType");
boolean read = d.getBoolean("read");
headers.add(new ForumPostHeader(messageId, timestamp, author,
authorStatus, contentType, read));
} catch (FormatException ex) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, ex.toString(), ex);
}
}
return headers;
}
@Override
public Collection<Contact> getSubscribers(GroupId g) throws DbException {
// TODO
return Collections.emptyList();
}
@Override
public Collection<ContactId> getVisibility(GroupId g) throws DbException {
return db.getVisibility(g);
}
@Override
public void removeForum(Forum f) throws DbException {
db.removeGroup(f.getGroup());
}
@Override
public void setReadFlag(MessageId m, boolean read) throws DbException {
BdfDictionary d = new BdfDictionary();
d.put("read", read);
try {
db.mergeMessageMetadata(m, metadataEncoder.encode(d));
} catch (FormatException e) {
throw new RuntimeException(e);
}
}
@Override
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
db.setVisibility(g, visible);
}
@Override
public void setVisibleToAll(GroupId g, boolean all) throws DbException {
db.setVisibleToAll(g, all);
}
}

View File

@@ -3,39 +3,63 @@ package org.briarproject.forum;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.BdfWriterFactory;
import org.briarproject.api.data.MetadataEncoder;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.forum.ForumManager;
import org.briarproject.api.forum.ForumPostFactory;
import org.briarproject.api.forum.ForumSharingManager;
import org.briarproject.api.identity.Author;
import org.briarproject.api.sync.ValidationManager;
import org.briarproject.api.system.Clock;
import javax.inject.Singleton;
import static org.briarproject.forum.ForumManagerImpl.CLIENT_ID;
public class ForumModule extends AbstractModule {
@Override
protected void configure() {
bind(ForumManager.class).to(ForumManagerImpl.class);
bind(ForumManager.class).to(ForumManagerImpl.class).in(Singleton.class);
bind(ForumPostFactory.class).to(ForumPostFactoryImpl.class);
}
@Provides @Singleton
ForumPostValidator getValidator(ValidationManager validationManager,
CryptoComponent crypto, BdfReaderFactory bdfReaderFactory,
ForumPostValidator getForumPostValidator(
ValidationManager validationManager, CryptoComponent crypto,
BdfReaderFactory bdfReaderFactory,
BdfWriterFactory bdfWriterFactory,
ObjectReader<Author> authorReader, MetadataEncoder metadataEncoder,
Clock clock) {
ForumPostValidator validator = new ForumPostValidator(crypto,
bdfReaderFactory, bdfWriterFactory, authorReader,
metadataEncoder, clock);
validationManager.registerMessageValidator(CLIENT_ID, validator);
validationManager.registerMessageValidator(
ForumManagerImpl.CLIENT_ID, validator);
return validator;
}
@Provides @Singleton
ForumListValidator getForumListValidator(
ValidationManager validationManager,
BdfReaderFactory bdfReaderFactory,
MetadataEncoder metadataEncoder) {
ForumListValidator validator = new ForumListValidator(bdfReaderFactory,
metadataEncoder);
validationManager.registerMessageValidator(
ForumSharingManagerImpl.CLIENT_ID, validator);
return validator;
}
@Provides @Singleton
ForumSharingManager getForumSharingManager(ContactManager contactManager,
EventBus eventBus, ForumSharingManagerImpl forumSharingManager) {
contactManager.registerAddContactHook(forumSharingManager);
contactManager.registerRemoveContactHook(forumSharingManager);
eventBus.addListener(forumSharingManager);
return forumSharingManager;
}
}

View File

@@ -15,6 +15,7 @@ import org.briarproject.api.data.MetadataEncoder;
import org.briarproject.api.data.ObjectReader;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.identity.Author;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.MessageValidator;
@@ -26,8 +27,6 @@ import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.logging.Logger;
import javax.inject.Inject;
import static org.briarproject.api.forum.ForumConstants.MAX_CONTENT_TYPE_LENGTH;
import static org.briarproject.api.forum.ForumConstants.MAX_FORUM_POST_BODY_LENGTH;
import static org.briarproject.api.identity.AuthorConstants.MAX_SIGNATURE_LENGTH;
@@ -47,7 +46,6 @@ class ForumPostValidator implements MessageValidator {
private final Clock clock;
private final KeyParser keyParser;
@Inject
ForumPostValidator(CryptoComponent crypto,
BdfReaderFactory bdfReaderFactory,
BdfWriterFactory bdfWriterFactory,
@@ -63,7 +61,7 @@ class ForumPostValidator implements MessageValidator {
}
@Override
public Metadata validateMessage(Message m) {
public Metadata validateMessage(Message m, Group g) {
// Reject the message if it's too far in the future
long now = clock.currentTimeMillis();
if (m.getTimestamp() - now > MAX_CLOCK_DIFFERENCE) {
@@ -78,8 +76,7 @@ class ForumPostValidator implements MessageValidator {
BdfReader r = bdfReaderFactory.createReader(in);
MessageId parent = null;
Author author = null;
String contentType;
byte[] postBody, sig = null;
byte[] sig = null;
r.readListStart();
// Read the parent ID, if any
if (r.hasRaw()) {
@@ -93,9 +90,9 @@ class ForumPostValidator implements MessageValidator {
if (r.hasList()) author = authorReader.readObject(r);
else r.readNull();
// Read the content type
contentType = r.readString(MAX_CONTENT_TYPE_LENGTH);
String contentType = r.readString(MAX_CONTENT_TYPE_LENGTH);
// Read the forum post body
postBody = r.readRaw(MAX_FORUM_POST_BODY_LENGTH);
byte[] postBody = r.readRaw(MAX_FORUM_POST_BODY_LENGTH);
// Read the signature, if any
if (r.hasRaw()) sig = r.readRaw(MAX_SIGNATURE_LENGTH);

View File

@@ -0,0 +1,558 @@
package org.briarproject.forum;
import com.google.inject.Inject;
import org.briarproject.api.FormatException;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.contact.ContactManager.AddContactHook;
import org.briarproject.api.contact.ContactManager.RemoveContactHook;
import org.briarproject.api.data.BdfDictionary;
import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.BdfWriter;
import org.briarproject.api.data.BdfWriterFactory;
import org.briarproject.api.data.MetadataEncoder;
import org.briarproject.api.data.MetadataParser;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DatabaseExecutor;
import org.briarproject.api.db.DbException;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.MessageValidatedEvent;
import org.briarproject.api.forum.Forum;
import org.briarproject.api.forum.ForumManager;
import org.briarproject.api.forum.ForumSharingManager;
import org.briarproject.api.sync.ClientId;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupFactory;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageFactory;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.PrivateGroupFactory;
import org.briarproject.api.system.Clock;
import org.briarproject.util.StringUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.forum.ForumConstants.FORUM_SALT_LENGTH;
import static org.briarproject.api.forum.ForumConstants.MAX_FORUM_NAME_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
class ForumSharingManagerImpl implements ForumSharingManager, AddContactHook,
RemoveContactHook, EventListener {
static final ClientId CLIENT_ID = new ClientId(StringUtils.fromHexString(
"cd11a5d04dccd9e2931d6fc3df456313"
+ "63bb3e9d9d0e9405fccdb051f41f5449"));
private static final byte[] LOCAL_GROUP_DESCRIPTOR = new byte[0];
private static final Logger LOG =
Logger.getLogger(ForumSharingManagerImpl.class.getName());
private final DatabaseComponent db;
private final Executor dbExecutor;
private final ContactManager contactManager;
private final ForumManager forumManager;
private final GroupFactory groupFactory;
private final PrivateGroupFactory privateGroupFactory;
private final MessageFactory messageFactory;
private final BdfReaderFactory bdfReaderFactory;
private final BdfWriterFactory bdfWriterFactory;
private final MetadataEncoder metadataEncoder;
private final MetadataParser metadataParser;
private final SecureRandom random;
private final Clock clock;
private final Group localGroup;
/** Ensures isolation between database reads and writes. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Inject
ForumSharingManagerImpl(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor,
ContactManager contactManager, ForumManager forumManager,
GroupFactory groupFactory, PrivateGroupFactory privateGroupFactory,
MessageFactory messageFactory, BdfReaderFactory bdfReaderFactory,
BdfWriterFactory bdfWriterFactory, MetadataEncoder metadataEncoder,
MetadataParser metadataParser, SecureRandom random, Clock clock) {
this.db = db;
this.dbExecutor = dbExecutor;
this.contactManager = contactManager;
this.forumManager = forumManager;
this.groupFactory = groupFactory;
this.privateGroupFactory = privateGroupFactory;
this.messageFactory = messageFactory;
this.bdfReaderFactory = bdfReaderFactory;
this.bdfWriterFactory = bdfWriterFactory;
this.metadataEncoder = metadataEncoder;
this.metadataParser = metadataParser;
this.random = random;
this.clock = clock;
localGroup = groupFactory.createGroup(CLIENT_ID,
LOCAL_GROUP_DESCRIPTOR);
}
@Override
public void addingContact(ContactId c) {
lock.writeLock().lock();
try {
// Create a group to share with the contact
Group g = getContactGroup(db.getContact(c));
// Store the group and share it with the contact
db.addGroup(g);
db.setVisibility(g.getId(), Collections.singletonList(c));
// Attach the contact ID to the group
BdfDictionary d = new BdfDictionary();
d.put("contactId", c.getInt());
db.mergeGroupMetadata(g.getId(), metadataEncoder.encode(d));
// Share any forums that are shared with all contacts
List<Forum> shared = getForumsSharedWithAllContacts();
storeMessage(g.getId(), shared, 0);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void removingContact(ContactId c) {
lock.writeLock().lock();
try {
db.removeGroup(getContactGroup(db.getContact(c)));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void eventOccurred(Event e) {
if (e instanceof MessageValidatedEvent) {
MessageValidatedEvent m = (MessageValidatedEvent) e;
ClientId c = m.getClientId();
if (m.isValid() && !m.isLocal() && c.equals(CLIENT_ID))
remoteForumsUpdated(m.getMessage().getGroupId());
}
}
@Override
public ClientId getClientId() {
return CLIENT_ID;
}
@Override
public Forum createForum(String name) {
int length = StringUtils.toUtf8(name).length;
if (length == 0) throw new IllegalArgumentException();
if (length > MAX_FORUM_NAME_LENGTH)
throw new IllegalArgumentException();
byte[] salt = new byte[FORUM_SALT_LENGTH];
random.nextBytes(salt);
return createForum(name, salt);
}
@Override
public void addForum(Forum f) throws DbException {
lock.writeLock().lock();
try {
db.addGroup(f.getGroup());
} finally {
lock.writeLock().unlock();
}
}
@Override
public void removeForum(Forum f) throws DbException {
lock.writeLock().lock();
try {
// Update the list of forums shared with each contact
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
removeFromList(contactGroup.getId(), f);
}
db.removeGroup(f.getGroup());
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Collection<Forum> getAvailableForums() throws DbException {
lock.readLock().lock();
try {
// Get any forums we subscribe to
Set<Group> subscribed = new HashSet<Group>(db.getGroups(
forumManager.getClientId()));
// Get all forums shared by contacts
Set<Forum> available = new HashSet<Forum>();
for (Contact c : contactManager.getContacts()) {
Group g = getContactGroup(c);
// Find the latest update version
LatestUpdate latest = findLatest(g.getId(), false);
if (latest != null) {
// Retrieve and parse the latest update
byte[] raw = db.getRawMessage(latest.messageId);
for (Forum f : parseForumList(raw)) {
if (!subscribed.contains(f.getGroup()))
available.add(f);
}
}
}
return Collections.unmodifiableSet(available);
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Collection<Contact> getSharedBy(GroupId g) throws DbException {
lock.readLock().lock();
try {
List<Contact> subscribers = new ArrayList<Contact>();
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
if (listContains(contactGroup.getId(), g, false))
subscribers.add(c);
}
return Collections.unmodifiableList(subscribers);
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Collection<ContactId> getSharedWith(GroupId g) throws DbException {
lock.readLock().lock();
try {
List<ContactId> shared = new ArrayList<ContactId>();
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
if (listContains(contactGroup.getId(), g, true))
shared.add(c.getId());
}
return Collections.unmodifiableList(shared);
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public void setSharedWith(GroupId g, Collection<ContactId> shared)
throws DbException {
lock.writeLock().lock();
try {
// Retrieve the forum
Forum f = parseForum(db.getGroup(g));
// Remove the forum from the list of forums shared with all contacts
removeFromList(localGroup.getId(), f);
// Update the list of forums shared with each contact
shared = new HashSet<ContactId>(shared);
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
if (shared.contains(c.getId())) {
if (addToList(contactGroup.getId(), f)) {
// If the contact is sharing the forum, make it visible
if (listContains(contactGroup.getId(), g, false))
db.setVisibleToContact(c.getId(), g, true);
}
} else {
removeFromList(contactGroup.getId(), f);
db.setVisibleToContact(c.getId(), g, false);
}
}
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void setSharedWithAll(GroupId g) throws DbException {
lock.writeLock().lock();
try {
// Retrieve the forum
Forum f = parseForum(db.getGroup(g));
// Add the forum to the list of forums shared with all contacts
addToList(localGroup.getId(), f);
// Add the forum to the list of forums shared with each contact
for (Contact c : contactManager.getContacts()) {
Group contactGroup = getContactGroup(c);
if (addToList(contactGroup.getId(), f)) {
// If the contact is sharing the forum, make it visible
if (listContains(contactGroup.getId(), g, false))
db.setVisibleToContact(getContactId(g), g, true);
}
}
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
private Group getContactGroup(Contact c) {
return privateGroupFactory.createPrivateGroup(CLIENT_ID, c);
}
// Locking: lock.writeLock
private List<Forum> getForumsSharedWithAllContacts() throws DbException,
FormatException {
// Ensure the local group exists
db.addGroup(localGroup);
// Find the latest update in the local group
LatestUpdate latest = findLatest(localGroup.getId(), true);
if (latest == null) return Collections.emptyList();
// Retrieve and parse the latest update
return parseForumList(db.getRawMessage(latest.messageId));
}
// Locking: lock.readLock
private LatestUpdate findLatest(GroupId g, boolean local)
throws DbException, FormatException {
LatestUpdate latest = null;
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
BdfDictionary d = metadataParser.parse(e.getValue());
if (d.getBoolean("local") != local) continue;
long version = d.getInteger("version");
if (latest == null || version > latest.version)
latest = new LatestUpdate(e.getKey(), version);
}
return latest;
}
private List<Forum> parseForumList(byte[] raw) throws FormatException {
List<Forum> forums = new ArrayList<Forum>();
ByteArrayInputStream in = new ByteArrayInputStream(raw,
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
try {
r.readListStart();
r.skipInteger(); // Version
r.readListStart();
while (!r.hasListEnd()) {
r.readListStart();
String name = r.readString(MAX_FORUM_NAME_LENGTH);
byte[] salt = r.readRaw(FORUM_SALT_LENGTH);
r.readListEnd();
forums.add(createForum(name, salt));
}
r.readListEnd();
r.readListEnd();
if (!r.eof()) throw new FormatException();
return forums;
} catch (FormatException e) {
throw e;
} catch (IOException e) {
// Shouldn't happen with ByteArrayInputStream
throw new RuntimeException(e);
}
}
// Locking: lock.writeLock
private void storeMessage(GroupId g, List<Forum> forums, long version)
throws DbException, FormatException {
byte[] body = encodeForumList(forums, version);
long now = clock.currentTimeMillis();
Message m = messageFactory.createMessage(g, now, body);
BdfDictionary d = new BdfDictionary();
d.put("version", version);
d.put("local", true);
db.addLocalMessage(m, CLIENT_ID, metadataEncoder.encode(d), true);
}
private byte[] encodeForumList(List<Forum> forums, long version) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
BdfWriter w = bdfWriterFactory.createWriter(out);
try {
w.writeListStart();
w.writeInteger(version);
w.writeListStart();
for (Forum f : forums) {
w.writeListStart();
w.writeString(f.getName());
w.writeRaw(f.getSalt());
w.writeListEnd();
}
w.writeListEnd();
w.writeListEnd();
} catch (IOException e) {
// Shouldn't happen with ByteArrayOutputStream
throw new RuntimeException(e);
}
return out.toByteArray();
}
private void remoteForumsUpdated(final GroupId g) {
dbExecutor.execute(new Runnable() {
public void run() {
lock.writeLock().lock();
try {
setForumVisibility(getContactId(g), getVisibleForums(g));
} catch (DbException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
}
});
}
// Locking: lock.readLock
private ContactId getContactId(GroupId contactGroupId) throws DbException,
FormatException {
Metadata meta = db.getGroupMetadata(contactGroupId);
BdfDictionary d = metadataParser.parse(meta);
int id = d.getInteger("contactId").intValue();
return new ContactId(id);
}
// Locking: lock.readLock
private Set<GroupId> getVisibleForums(GroupId contactGroupId)
throws DbException, FormatException {
// Get the latest local and remote updates
LatestUpdate local = findLatest(contactGroupId, true);
LatestUpdate remote = findLatest(contactGroupId, false);
// If there's no local and/or remote update, no forums are visible
if (local == null || remote == null) return Collections.emptySet();
// Intersect the sets of shared forums
byte[] localRaw = db.getRawMessage(local.messageId);
Set<Forum> shared = new HashSet<Forum>(parseForumList(localRaw));
byte[] remoteRaw = db.getRawMessage(remote.messageId);
shared.retainAll(parseForumList(remoteRaw));
// Forums in the intersection should be visible
Set<GroupId> visible = new HashSet<GroupId>(shared.size());
for (Forum f : shared) visible.add(f.getId());
return visible;
}
// Locking: lock.writeLock
private void setForumVisibility(ContactId c, Set<GroupId> visible)
throws DbException {
for (Group g : db.getGroups(forumManager.getClientId())) {
boolean isVisible = db.isVisibleToContact(c, g.getId());
boolean shouldBeVisible = visible.contains(g.getId());
if (isVisible && !shouldBeVisible)
db.setVisibleToContact(c, g.getId(), false);
else if (!isVisible && shouldBeVisible)
db.setVisibleToContact(c, g.getId(), true);
}
}
private Forum createForum(String name, byte[] salt) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
BdfWriter w = bdfWriterFactory.createWriter(out);
try {
w.writeListStart();
w.writeString(name);
w.writeRaw(salt);
w.writeListEnd();
} catch (IOException e) {
// Shouldn't happen with ByteArrayOutputStream
throw new RuntimeException(e);
}
Group g = groupFactory.createGroup(forumManager.getClientId(),
out.toByteArray());
return new Forum(g, name, salt);
}
private Forum parseForum(Group g) throws FormatException {
ByteArrayInputStream in = new ByteArrayInputStream(g.getDescriptor());
BdfReader r = bdfReaderFactory.createReader(in);
try {
r.readListStart();
String name = r.readString(MAX_FORUM_NAME_LENGTH);
byte[] salt = r.readRaw(FORUM_SALT_LENGTH);
r.readListEnd();
if (!r.eof()) throw new FormatException();
return new Forum(g, name, salt);
} catch (FormatException e) {
throw e;
} catch (IOException e) {
// Shouldn't happen with ByteArrayInputStream
throw new RuntimeException(e);
}
}
// Locking: lock.readLock
private boolean listContains(GroupId g, GroupId forum, boolean local)
throws DbException, FormatException {
LatestUpdate latest = findLatest(g, local);
if (latest == null) return false;
List<Forum> list = parseForumList(db.getRawMessage(latest.messageId));
for (Forum f : list) if (f.getId().equals(forum)) return true;
return false;
}
// Locking: lock.writeLock
private boolean addToList(GroupId g, Forum f) throws DbException,
FormatException {
LatestUpdate latest = findLatest(g, true);
if (latest == null) {
storeMessage(g, Collections.singletonList(f), 0);
return true;
}
List<Forum> list = parseForumList(db.getRawMessage(latest.messageId));
if (list.contains(f)) return false;
list.add(f);
storeMessage(g, list, latest.version + 1);
return true;
}
// Locking: lock.writeLock
private void removeFromList(GroupId g, Forum f) throws DbException,
FormatException {
LatestUpdate latest = findLatest(g, true);
if (latest == null) return;
List<Forum> list = parseForumList(db.getRawMessage(latest.messageId));
if (list.remove(f)) storeMessage(g, list, latest.version + 1);
}
private static class LatestUpdate {
private final MessageId messageId;
private final long version;
private LatestUpdate(MessageId messageId, long version) {
this.messageId = messageId;
this.version = version;
}
}
}

View File

@@ -5,6 +5,7 @@ import com.google.inject.Inject;
import org.briarproject.api.FormatException;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.contact.ContactManager.AddContactHook;
import org.briarproject.api.contact.ContactManager.RemoveContactHook;
import org.briarproject.api.data.BdfDictionary;
@@ -50,18 +51,19 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
Logger.getLogger(MessagingManagerImpl.class.getName());
private final DatabaseComponent db;
private final ContactManager contactManager;
private final PrivateGroupFactory privateGroupFactory;
private final BdfReaderFactory bdfReaderFactory;
private final MetadataEncoder metadataEncoder;
private final MetadataParser metadataParser;
@Inject
MessagingManagerImpl(DatabaseComponent db,
MessagingManagerImpl(DatabaseComponent db, ContactManager contactManager,
PrivateGroupFactory privateGroupFactory,
BdfReaderFactory bdfReaderFactory,
MetadataEncoder metadataEncoder,
BdfReaderFactory bdfReaderFactory, MetadataEncoder metadataEncoder,
MetadataParser metadataParser) {
this.db = db;
this.contactManager = contactManager;
this.privateGroupFactory = privateGroupFactory;
this.bdfReaderFactory = bdfReaderFactory;
this.metadataEncoder = metadataEncoder;
@@ -71,8 +73,8 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
@Override
public void addingContact(ContactId c) {
try {
// Create the conversation group
Group g = getConversationGroup(db.getContact(c));
// Create a group to share with the contact
Group g = getContactGroup(db.getContact(c));
// Store the group and share it with the contact
db.addGroup(g);
db.setVisibility(g.getId(), Collections.singletonList(c));
@@ -87,14 +89,14 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
}
}
private Group getConversationGroup(Contact c) {
private Group getContactGroup(Contact c) {
return privateGroupFactory.createPrivateGroup(CLIENT_ID, c);
}
@Override
public void removingContact(ContactId c) {
try {
db.removeGroup(getConversationGroup(db.getContact(c)));
db.removeGroup(getContactGroup(db.getContact(c)));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -135,7 +137,7 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
@Override
public GroupId getConversationId(ContactId c) throws DbException {
return getConversationGroup(db.getContact(c)).getId();
return getContactGroup(contactManager.getContact(c)).getId();
}
@Override
@@ -172,15 +174,16 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
try {
// Extract the private message body
r.readListStart();
if (r.hasRaw()) r.skipRaw(); // Parent ID
else r.skipNull(); // No parent
r.skipString(); // Content type
return r.readRaw(MAX_PRIVATE_MESSAGE_BODY_LENGTH);
byte[] messageBody = r.readRaw(MAX_PRIVATE_MESSAGE_BODY_LENGTH);
r.readListEnd();
if (!r.eof()) throw new FormatException();
return messageBody;
} catch (FormatException e) {
// Not a valid private message
throw new IllegalArgumentException();
throw new DbException(e);
} catch (IOException e) {
// Shouldn't happen with ByteArrayInputStream
throw new RuntimeException(e);

View File

@@ -7,6 +7,7 @@ import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.MetadataEncoder;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.MessageValidator;
@@ -16,8 +17,6 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.logging.Logger;
import javax.inject.Inject;
import static org.briarproject.api.messaging.MessagingConstants.MAX_CONTENT_TYPE_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PRIVATE_MESSAGE_BODY_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
@@ -32,7 +31,6 @@ class PrivateMessageValidator implements MessageValidator {
private final MetadataEncoder metadataEncoder;
private final Clock clock;
@Inject
PrivateMessageValidator(BdfReaderFactory bdfReaderFactory,
MetadataEncoder metadataEncoder, Clock clock) {
this.bdfReaderFactory = bdfReaderFactory;
@@ -41,7 +39,7 @@ class PrivateMessageValidator implements MessageValidator {
}
@Override
public Metadata validateMessage(Message m) {
public Metadata validateMessage(Message m, Group g) {
// Reject the message if it's too far in the future
long now = clock.currentTimeMillis();
if (m.getTimestamp() - now > MAX_CLOCK_DIFFERENCE) {
@@ -55,7 +53,6 @@ class PrivateMessageValidator implements MessageValidator {
MESSAGE_HEADER_LENGTH, raw.length - MESSAGE_HEADER_LENGTH);
BdfReader r = bdfReaderFactory.createReader(in);
MessageId parent = null;
String contentType;
r.readListStart();
// Read the parent ID, if any
if (r.hasRaw()) {
@@ -66,7 +63,7 @@ class PrivateMessageValidator implements MessageValidator {
r.readNull();
}
// Read the content type
contentType = r.readString(MAX_CONTENT_TYPE_LENGTH);
String contentType = r.readString(MAX_CONTENT_TYPE_LENGTH);
// Read the private message body
r.readRaw(MAX_PRIVATE_MESSAGE_BODY_LENGTH);
r.readListEnd();

View File

@@ -7,6 +7,7 @@ import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.contact.ContactManager;
import org.briarproject.api.contact.ContactManager.AddContactHook;
import org.briarproject.api.contact.ContactManager.RemoveContactHook;
import org.briarproject.api.data.BdfDictionary;
@@ -60,6 +61,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
Logger.getLogger(TransportPropertyManagerImpl.class.getName());
private final DatabaseComponent db;
private final ContactManager contactManager;
private final PrivateGroupFactory privateGroupFactory;
private final MessageFactory messageFactory;
private final BdfReaderFactory bdfReaderFactory;
@@ -74,11 +76,13 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
@Inject
TransportPropertyManagerImpl(DatabaseComponent db,
GroupFactory groupFactory, PrivateGroupFactory privateGroupFactory,
ContactManager contactManager, GroupFactory groupFactory,
PrivateGroupFactory privateGroupFactory,
MessageFactory messageFactory, BdfReaderFactory bdfReaderFactory,
BdfWriterFactory bdfWriterFactory, MetadataEncoder metadataEncoder,
MetadataParser metadataParser, Clock clock) {
this.db = db;
this.contactManager = contactManager;
this.privateGroupFactory = privateGroupFactory;
this.messageFactory = messageFactory;
this.bdfReaderFactory = bdfReaderFactory;
@@ -109,7 +113,145 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} catch (FormatException e) {
throw new RuntimeException(e);
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void removingContact(ContactId c) {
lock.writeLock().lock();
try {
db.removeGroup(getContactGroup(db.getContact(c)));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void addRemoteProperties(ContactId c, DeviceId dev,
Map<TransportId, TransportProperties> props) throws DbException {
lock.writeLock().lock();
try {
Group g = getContactGroup(db.getContact(c));
for (Entry<TransportId, TransportProperties> e : props.entrySet()) {
storeMessage(g.getId(), dev, e.getKey(), e.getValue(), 0, false,
false);
}
} catch (FormatException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Map<TransportId, TransportProperties> getLocalProperties()
throws DbException {
lock.readLock().lock();
try {
// Find the latest local update for each transport
Map<TransportId, LatestUpdate> latest =
findLatest(localGroup.getId(), true);
// Retrieve and parse the latest local properties
Map<TransportId, TransportProperties> local =
new HashMap<TransportId, TransportProperties>();
for (Entry<TransportId, LatestUpdate> e : latest.entrySet()) {
byte[] raw = db.getRawMessage(e.getValue().messageId);
local.put(e.getKey(), parseProperties(raw));
}
return Collections.unmodifiableMap(local);
} catch (NoSuchGroupException e) {
// Local group doesn't exist - there are no local properties
return Collections.emptyMap();
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public TransportProperties getLocalProperties(TransportId t)
throws DbException {
lock.readLock().lock();
try {
// Find the latest local update
LatestUpdate latest = findLatest(localGroup.getId(), t, true);
if (latest == null) return null;
// Retrieve and parse the latest local properties
return parseProperties(db.getRawMessage(latest.messageId));
} catch (NoSuchGroupException e) {
// Local group doesn't exist - there are no local properties
return null;
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Map<ContactId, TransportProperties> getRemoteProperties(
TransportId t) throws DbException {
lock.readLock().lock();
try {
Map<ContactId, TransportProperties> remote =
new HashMap<ContactId, TransportProperties>();
for (Contact c : contactManager.getContacts()) {
Group g = getContactGroup(c);
// Find the latest remote update
LatestUpdate latest = findLatest(g.getId(), t, false);
if (latest != null) {
// Retrieve and parse the latest remote properties
byte[] raw = db.getRawMessage(latest.messageId);
remote.put(c.getId(), parseProperties(raw));
}
}
return Collections.unmodifiableMap(remote);
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public void mergeLocalProperties(TransportId t, TransportProperties p)
throws DbException {
lock.writeLock().lock();
try {
// Create the local group if necessary
db.addGroup(localGroup);
// Merge the new properties with any existing properties
TransportProperties merged;
LatestUpdate latest = findLatest(localGroup.getId(), t, true);
if (latest == null) {
merged = p;
} else {
byte[] raw = db.getRawMessage(latest.messageId);
TransportProperties old = parseProperties(raw);
merged = new TransportProperties(old);
merged.putAll(p);
if (merged.equals(old)) return; // Unchanged
}
// Store the merged properties in the local group
DeviceId dev = db.getDeviceId();
long version = latest == null ? 1 : latest.version + 1;
storeMessage(localGroup.getId(), dev, t, merged, version, true,
false);
// Store the merged properties in each contact's group
for (Contact c : contactManager.getContacts()) {
Group g = getContactGroup(c);
latest = findLatest(g.getId(), t, true);
version = latest == null ? 1 : latest.version + 1;
storeMessage(g.getId(), dev, t, merged, version, true, true);
}
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
@@ -119,6 +261,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
return privateGroupFactory.createPrivateGroup(CLIENT_ID, c);
}
// Locking: lock.writeLock
private void storeMessage(GroupId g, DeviceId dev, TransportId t,
TransportProperties p, long version, boolean local, boolean shared)
throws DbException, FormatException {
@@ -150,80 +293,43 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
return out.toByteArray();
}
@Override
public void removingContact(ContactId c) {
lock.writeLock().lock();
try {
db.removeGroup(getContactGroup(db.getContact(c)));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void addRemoteProperties(ContactId c, DeviceId dev,
Map<TransportId, TransportProperties> props) throws DbException {
lock.writeLock().lock();
try {
Group g = getContactGroup(db.getContact(c));
for (Entry<TransportId, TransportProperties> e : props.entrySet()) {
storeMessage(g.getId(), dev, e.getKey(), e.getValue(), 0, false,
false);
}
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Map<TransportId, TransportProperties> getLocalProperties()
throws DbException {
lock.readLock().lock();
try {
// Find the latest local version for each transport
Map<TransportId, LatestUpdate> latest =
findLatest(localGroup.getId(), true);
// Retrieve and decode the latest local properties
Map<TransportId, TransportProperties> local =
new HashMap<TransportId, TransportProperties>();
for (Entry<TransportId, LatestUpdate> e : latest.entrySet()) {
byte[] raw = db.getRawMessage(e.getValue().messageId);
local.put(e.getKey(), decodeProperties(raw));
}
return Collections.unmodifiableMap(local);
} catch (NoSuchGroupException e) {
// Local group doesn't exist - there are no local properties
return Collections.emptyMap();
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
// Locking: lock.readLock
private Map<TransportId, LatestUpdate> findLatest(GroupId g, boolean local)
throws DbException, FormatException {
// TODO: Use metadata queries
Map<TransportId, LatestUpdate> latestUpdates =
new HashMap<TransportId, LatestUpdate>();
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
BdfDictionary d = metadataParser.parse(e.getValue());
if (d.getBoolean("local") != local) continue;
TransportId t = new TransportId(d.getString("transportId"));
long version = d.getInteger("version");
LatestUpdate latest = latestUpdates.get(t);
if (latest == null || version > latest.version)
latestUpdates.put(t, new LatestUpdate(e.getKey(), version));
if (d.getBoolean("local") == local) {
TransportId t = new TransportId(d.getString("transportId"));
long version = d.getInteger("version");
LatestUpdate latest = latestUpdates.get(t);
if (latest == null || version > latest.version)
latestUpdates.put(t, new LatestUpdate(e.getKey(), version));
}
}
return latestUpdates;
}
private TransportProperties decodeProperties(byte[] raw)
// Locking: lock.readLock
private LatestUpdate findLatest(GroupId g, TransportId t, boolean local)
throws DbException, FormatException {
LatestUpdate latest = null;
Map<MessageId, Metadata> metadata = db.getMessageMetadata(g);
for (Entry<MessageId, Metadata> e : metadata.entrySet()) {
BdfDictionary d = metadataParser.parse(e.getValue());
if (d.getString("transportId").equals(t.getString())
&& d.getBoolean("local") == local) {
long version = d.getInteger("version");
if (latest == null || version > latest.version)
latest = new LatestUpdate(e.getKey(), version);
}
}
return latest;
}
private TransportProperties parseProperties(byte[] raw)
throws IOException {
TransportProperties p = new TransportProperties();
ByteArrayInputStream in = new ByteArrayInputStream(raw,
@@ -239,92 +345,12 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
String value = r.readString(MAX_PROPERTY_LENGTH);
p.put(key, value);
}
r.readDictionaryEnd();
r.readListEnd();
if (!r.eof()) throw new FormatException();
return p;
}
@Override
public TransportProperties getLocalProperties(TransportId t)
throws DbException {
lock.readLock().lock();
try {
// Find the latest local version
LatestUpdate latest = findLatest(localGroup.getId(), true).get(t);
if (latest == null) return null;
// Retrieve and decode the latest local properties
return decodeProperties(db.getRawMessage(latest.messageId));
} catch (NoSuchGroupException e) {
// Local group doesn't exist - there are no local properties
return null;
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public Map<ContactId, TransportProperties> getRemoteProperties(
TransportId t) throws DbException {
lock.readLock().lock();
try {
Map<ContactId, TransportProperties> remote =
new HashMap<ContactId, TransportProperties>();
for (Contact c : db.getContacts()) {
Group g = getContactGroup(c);
// Find the latest remote version
LatestUpdate latest = findLatest(g.getId(), false).get(t);
if (latest != null) {
// Retrieve and decode the latest remote properties
byte[] raw = db.getRawMessage(latest.messageId);
remote.put(c.getId(), decodeProperties(raw));
}
}
return Collections.unmodifiableMap(remote);
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.readLock().unlock();
}
}
@Override
public void mergeLocalProperties(TransportId t, TransportProperties p)
throws DbException {
lock.writeLock().lock();
try {
// Create the local group if necessary
db.addGroup(localGroup);
// Merge the new properties with any existing properties
TransportProperties merged;
LatestUpdate latest = findLatest(localGroup.getId(), true).get(t);
if (latest == null) {
merged = p;
} else {
byte[] raw = db.getRawMessage(latest.messageId);
TransportProperties old = decodeProperties(raw);
merged = new TransportProperties(old);
merged.putAll(p);
if (merged.equals(old)) return; // Unchanged
}
// Store the merged properties in the local group
DeviceId dev = db.getDeviceId();
long version = latest == null ? 1 : latest.version + 1;
storeMessage(localGroup.getId(), dev, t, merged, version, true,
false);
// Store the merged properties in each contact's group
for (Contact c : db.getContacts()) {
Group g = getContactGroup(c);
latest = findLatest(g.getId(), true).get(t);
version = latest == null ? 1 : latest.version + 1;
storeMessage(g.getId(), dev, t, merged, version, true, true);
}
} catch (IOException e) {
throw new DbException(e);
} finally {
lock.writeLock().unlock();
}
}
private static class LatestUpdate {
private final MessageId messageId;

View File

@@ -7,6 +7,7 @@ import org.briarproject.api.data.BdfReader;
import org.briarproject.api.data.BdfReaderFactory;
import org.briarproject.api.data.MetadataEncoder;
import org.briarproject.api.db.Metadata;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageValidator;
import org.briarproject.api.system.Clock;
@@ -15,8 +16,6 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.logging.Logger;
import javax.inject.Inject;
import static org.briarproject.api.TransportId.MAX_TRANSPORT_ID_LENGTH;
import static org.briarproject.api.properties.TransportPropertyConstants.MAX_PROPERTIES_PER_TRANSPORT;
import static org.briarproject.api.properties.TransportPropertyConstants.MAX_PROPERTY_LENGTH;
@@ -32,7 +31,6 @@ class TransportPropertyValidator implements MessageValidator {
private final MetadataEncoder metadataEncoder;
private final Clock clock;
@Inject
TransportPropertyValidator(BdfReaderFactory bdfReaderFactory,
MetadataEncoder metadataEncoder, Clock clock) {
this.bdfReaderFactory = bdfReaderFactory;
@@ -41,7 +39,7 @@ class TransportPropertyValidator implements MessageValidator {
}
@Override
public Metadata validateMessage(Message m) {
public Metadata validateMessage(Message m, Group g) {
// Reject the message if it's too far in the future
long now = clock.currentTimeMillis();
if (m.getTimestamp() - now > MAX_CLOCK_DIFFERENCE) {

View File

@@ -13,7 +13,6 @@ import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageSharedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.MessageValidatedEvent;
import org.briarproject.api.event.ShutdownEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
@@ -151,9 +150,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof MessageSharedEvent) {
dbExecutor.execute(new GenerateOffer());
} else if (e instanceof MessageValidatedEvent) {
if (((MessageValidatedEvent) e).isValid())
dbExecutor.execute(new GenerateOffer());
} else if (e instanceof GroupVisibilityUpdatedEvent) {
GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e;
if (g.getAffectedContacts().contains(contactId))

View File

@@ -15,6 +15,7 @@ import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.lifecycle.Service;
import org.briarproject.api.sync.ClientId;
import org.briarproject.api.sync.Group;
import org.briarproject.api.sync.GroupId;
import org.briarproject.api.sync.Message;
import org.briarproject.api.sync.MessageId;
@@ -84,7 +85,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
for (MessageId id : db.getMessagesToValidate(c)) {
try {
Message m = parseMessage(id, db.getRawMessage(id));
validateMessage(m, c);
Group g = db.getGroup(m.getGroupId());
validateMessage(m, g);
} catch (NoSuchMessageException e) {
LOG.info("Message removed before validation");
}
@@ -106,15 +108,15 @@ class ValidationManagerImpl implements ValidationManager, Service,
return new Message(id, new GroupId(groupId), timestamp, raw);
}
private void validateMessage(final Message m, final ClientId c) {
private void validateMessage(final Message m, final Group g) {
cryptoExecutor.execute(new Runnable() {
public void run() {
MessageValidator v = validators.get(c);
MessageValidator v = validators.get(g.getClientId());
if (v == null) {
LOG.warning("No validator");
} else {
Metadata meta = v.validateMessage(m);
storeValidationResult(m, c, meta);
Metadata meta = v.validateMessage(m, g);
storeValidationResult(m, g.getClientId(), meta);
}
}
});
@@ -132,6 +134,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
hook.validatingMessage(m, c, meta);
db.mergeMessageMetadata(m.getId(), meta);
db.setMessageValid(m, c, true);
db.setMessageShared(m, true);
}
} catch (NoSuchMessageException e) {
LOG.info("Message removed during validation");
@@ -146,18 +149,17 @@ class ValidationManagerImpl implements ValidationManager, Service,
@Override
public void eventOccurred(Event e) {
if (e instanceof MessageAddedEvent) {
MessageAddedEvent m = (MessageAddedEvent) e;
// Validate the message if it wasn't created locally
if (m.getContactId() != null) loadClientId(m.getMessage());
MessageAddedEvent m = (MessageAddedEvent) e;
if (m.getContactId() != null) loadGroup(m.getMessage());
}
}
private void loadClientId(final Message m) {
private void loadGroup(final Message m) {
dbExecutor.execute(new Runnable() {
public void run() {
try {
ClientId c = db.getGroup(m.getGroupId()).getClientId();
validateMessage(m, c);
validateMessage(m, db.getGroup(m.getGroupId()));
} catch (NoSuchGroupException e) {
LOG.info("Group removed before validation");
} catch (DbException e) {