Unshared messages.

This commit is contained in:
akwizgran
2016-01-29 13:22:56 +00:00
parent 062b987585
commit e76aef3dc8
13 changed files with 271 additions and 126 deletions

View File

@@ -19,6 +19,7 @@ import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.MessageStatus;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.ValidationManager.Validity;
import org.briarproject.api.transport.TransportKeys;
import java.io.IOException;
@@ -111,7 +112,8 @@ interface Database<T> {
* <p>
* Locking: write.
*/
void addMessage(T txn, Message m, boolean local) throws DbException;
void addMessage(T txn, Message m, Validity validity, boolean shared)
throws DbException;
/**
* Records that a message has been offered by the given contact.
@@ -617,13 +619,20 @@ interface Database<T> {
void setLocalAuthorStatus(T txn, AuthorId a, StorageStatus s)
throws DbException;
/**
* Marks the given message as shared or unshared.
* <p>
* Locking: write.
*/
void setMessageShared(T txn, MessageId m, boolean shared)
throws DbException;
/**
* Marks the given message as valid or invalid.
* <p>
* Locking: write.
*/
void setMessageValidity(T txn, MessageId m, boolean valid)
throws DbException;
void setMessageValid(T txn, MessageId m, boolean valid) throws DbException;
/**
* Sets the reordering window for the given contact and transport in the

View File

@@ -20,6 +20,7 @@ import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageSharedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.MessageValidatedEvent;
@@ -47,6 +48,7 @@ import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.ValidationManager.Validity;
import org.briarproject.api.transport.TransportKeys;
import java.io.IOException;
@@ -64,6 +66,8 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.sync.ValidationManager.Validity.UNKNOWN;
import static org.briarproject.api.sync.ValidationManager.Validity.VALID;
import static org.briarproject.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
/**
@@ -214,8 +218,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
public void addLocalMessage(Message m, ClientId c, Metadata meta)
throws DbException {
public void addLocalMessage(Message m, ClientId c, Metadata meta,
boolean shared) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
@@ -224,7 +228,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new MessageExistsException();
if (!db.containsGroup(txn, m.getGroupId()))
throw new NoSuchSubscriptionException();
addMessage(txn, m, null);
addMessage(txn, m, VALID, shared, null);
db.mergeMessageMetadata(txn, m.getId(), meta);
db.commitTransaction(txn);
} catch (DbException e) {
@@ -244,9 +248,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
* Locking: write.
* @param sender null for a locally generated message.
*/
private void addMessage(T txn, Message m, ContactId sender)
throws DbException {
db.addMessage(txn, m, sender == null);
private void addMessage(T txn, Message m, Validity validity, boolean shared,
ContactId sender) throws DbException {
db.addMessage(txn, m, validity, shared);
GroupId g = m.getGroupId();
Collection<ContactId> visibility = db.getVisibility(txn, g);
visibility = new HashSet<ContactId>(visibility);
@@ -982,7 +986,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
duplicate = db.containsMessage(txn, m.getId());
visible = db.containsVisibleGroup(txn, c, m.getGroupId());
if (visible) {
if (!duplicate) addMessage(txn, m, c);
if (!duplicate) addMessage(txn, m, UNKNOWN, true, c);
db.raiseAckFlag(txn, c, m.getId());
}
db.commitTransaction(txn);
@@ -1214,7 +1218,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
public void setMessageValidity(Message m, ClientId c, boolean valid)
public void setMessageShared(Message m, boolean shared)
throws DbException {
lock.writeLock().lock();
try {
@@ -1222,7 +1226,27 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
try {
if (!db.containsMessage(txn, m.getId()))
throw new NoSuchMessageException();
db.setMessageValidity(txn, m.getId(), valid);
db.setMessageShared(txn, m.getId(), shared);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (shared) eventBus.broadcast(new MessageSharedEvent(m));
}
public void setMessageValid(Message m, ClientId c, boolean valid)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsMessage(txn, m.getId()))
throw new NoSuchMessageException();
db.setMessageValid(txn, m.getId(), valid);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);

View File

@@ -22,6 +22,7 @@ import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.MessageStatus;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.ValidationManager.Validity;
import org.briarproject.api.system.Clock;
import org.briarproject.api.transport.IncomingKeys;
import org.briarproject.api.transport.OutgoingKeys;
@@ -54,9 +55,9 @@ import static java.util.logging.Level.WARNING;
import static org.briarproject.api.db.Metadata.REMOVE;
import static org.briarproject.api.db.StorageStatus.ADDING;
import static org.briarproject.api.sync.SyncConstants.MAX_SUBSCRIPTIONS;
import static org.briarproject.api.sync.ValidationManager.Status.INVALID;
import static org.briarproject.api.sync.ValidationManager.Status.UNKNOWN;
import static org.briarproject.api.sync.ValidationManager.Status.VALID;
import static org.briarproject.api.sync.ValidationManager.Validity.INVALID;
import static org.briarproject.api.sync.ValidationManager.Validity.UNKNOWN;
import static org.briarproject.api.sync.ValidationManager.Validity.VALID;
import static org.briarproject.db.DatabaseConstants.DB_SETTINGS_NAMESPACE;
import static org.briarproject.db.DatabaseConstants.DEVICE_ID_KEY;
import static org.briarproject.db.DatabaseConstants.DEVICE_SETTINGS_NAMESPACE;
@@ -70,8 +71,8 @@ import static org.briarproject.db.ExponentialBackoff.calculateExpiry;
*/
abstract class JdbcDatabase implements Database<Connection> {
private static final int SCHEMA_VERSION = 18;
private static final int MIN_SCHEMA_VERSION = 18;
private static final int SCHEMA_VERSION = 19;
private static final int MIN_SCHEMA_VERSION = 19;
private static final String CREATE_SETTINGS =
"CREATE TABLE settings"
@@ -164,8 +165,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " (messageId HASH NOT NULL,"
+ " groupId HASH NOT NULL,"
+ " timestamp BIGINT NOT NULL,"
+ " local BOOLEAN NOT NULL,"
+ " valid INT NOT NULL,"
+ " shared BOOLEAN NOT NULL,"
+ " length INT NOT NULL,"
+ " raw BLOB NOT NULL,"
+ " PRIMARY KEY (messageId),"
@@ -674,19 +675,19 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void addMessage(Connection txn, Message m, boolean local)
throws DbException {
public void addMessage(Connection txn, Message m, Validity validity,
boolean shared) throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO messages (messageId, groupId, timestamp,"
+ " local, valid, length, raw)"
+ " valid, shared, length, raw)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getId().getBytes());
ps.setBytes(2, m.getGroupId().getBytes());
ps.setLong(3, m.getTimestamp());
ps.setBoolean(4, local);
ps.setInt(5, local ? VALID.getValue() : UNKNOWN.getValue());
ps.setInt(4, validity.getValue());
ps.setBoolean(5, shared);
byte[] raw = m.getRaw();
ps.setInt(6, raw.length);
ps.setBytes(7, raw);
@@ -1031,7 +1032,8 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " JOIN groupVisibilities AS gv"
+ " ON m.groupId = gv.groupId"
+ " WHERE messageId = ?"
+ " AND contactId = ?";
+ " AND contactId = ?"
+ " AND shared = TRUE";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
@@ -1482,7 +1484,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?"
+ " AND valid = ?"
+ " AND valid = ? AND shared = TRUE"
+ " AND seen = FALSE AND requested = FALSE"
+ " AND s.expiry < ?"
+ " ORDER BY timestamp DESC LIMIT ?";
@@ -1544,7 +1546,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?"
+ " AND valid = ?"
+ " AND valid = ? AND shared = TRUE"
+ " AND seen = FALSE"
+ " AND s.expiry < ?"
+ " ORDER BY timestamp DESC";
@@ -1633,7 +1635,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " ON m.messageId = s.messageId"
+ " AND cg.contactId = s.contactId"
+ " WHERE cg.contactId = ?"
+ " AND valid = ?"
+ " AND valid = ? AND shared = TRUE"
+ " AND seen = FALSE AND requested = TRUE"
+ " AND s.expiry < ?"
+ " ORDER BY timestamp DESC";
@@ -1660,7 +1662,8 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Settings getSettings(Connection txn, String namespace) throws DbException {
public Settings getSettings(Connection txn, String namespace)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -2397,7 +2400,24 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setMessageValidity(Connection txn, MessageId m, boolean valid)
public void setMessageShared(Connection txn, MessageId m, boolean shared)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE messages SET shared = ? WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBoolean(1, shared);
ps.setBytes(2, m.getBytes());
int affected = ps.executeUpdate();
if (affected < 0) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setMessageValid(Connection txn, MessageId m, boolean valid)
throws DbException {
PreparedStatement ps = null;
try {

View File

@@ -135,7 +135,7 @@ class ForumManagerImpl implements ForumManager {
d.put("read", true);
try {
Metadata meta = metadataEncoder.encode(d);
db.addLocalMessage(p.getMessage(), CLIENT_ID, meta);
db.addLocalMessage(p.getMessage(), CLIENT_ID, meta, true);
} catch (FormatException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}

View File

@@ -116,7 +116,7 @@ class MessagingManagerImpl implements MessagingManager, AddContactHook,
d.put("read", true);
try {
Metadata meta = metadataEncoder.encode(d);
db.addLocalMessage(m.getMessage(), CLIENT_ID, meta);
db.addLocalMessage(m.getMessage(), CLIENT_ID, meta, true);
} catch (FormatException e) {
throw new RuntimeException(e);
}

View File

@@ -130,7 +130,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
d.put("transportId", t.getString());
d.put("version", version);
d.put("local", true);
db.addLocalMessage(m, CLIENT_ID, metadataEncoder.encode(d));
db.addLocalMessage(m, CLIENT_ID, metadataEncoder.encode(d), true);
}
private byte[] encodeProperties(DeviceId dev, TransportId t,

View File

@@ -10,6 +10,7 @@ import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageSharedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.MessageValidatedEvent;
@@ -154,6 +155,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (e instanceof ContactRemovedEvent) {
ContactRemovedEvent c = (ContactRemovedEvent) e;
if (c.getContactId().equals(contactId)) interrupt();
} else if (e instanceof MessageSharedEvent) {
dbExecutor.execute(new GenerateOffer());
} else if (e instanceof MessageValidatedEvent) {
if (((MessageValidatedEvent) e).isValid())
dbExecutor.execute(new GenerateOffer());

View File

@@ -117,10 +117,10 @@ class ValidationManagerImpl implements ValidationManager, Service,
public void run() {
try {
if (meta == null) {
db.setMessageValidity(m, c, false);
db.setMessageValid(m, c, false);
} else {
db.mergeMessageMetadata(m.getId(), meta);
db.setMessageValidity(m, c, true);
db.setMessageValid(m, c, true);
}
} catch (NoSuchMessageException e) {
LOG.info("Message removed during validation");