Allow messages to be marked as temporary.

This commit is contained in:
akwizgran
2019-06-12 15:11:10 +01:00
parent dd7accfa95
commit cd40e771d2
17 changed files with 267 additions and 89 deletions

View File

@@ -92,7 +92,8 @@ class ClientHelperImpl implements ClientHelper {
public void addLocalMessage(Transaction txn, Message m,
BdfDictionary metadata, boolean shared)
throws DbException, FormatException {
db.addLocalMessage(txn, m, metadataEncoder.encode(metadata), shared);
db.addLocalMessage(txn, m, metadataEncoder.encode(metadata), shared,
false);
}
@Override

View File

@@ -115,7 +115,7 @@ interface Database<T> {
* if the message was created locally.
*/
void addMessage(T txn, Message m, MessageState state, boolean shared,
@Nullable ContactId sender) throws DbException;
boolean temporary, @Nullable ContactId sender) throws DbException;
/**
* Adds a dependency between two messages, where the dependent message is
@@ -630,6 +630,12 @@ interface Database<T> {
*/
void removePendingContact(T txn, PendingContactId p) throws DbException;
/**
* Removes all temporary messages (and all associated state) from the
* database.
*/
void removeTemporaryMessages(T txn) throws DbException;
/**
* Removes a transport (and all associated state) from the database.
*/
@@ -671,6 +677,11 @@ interface Database<T> {
void setHandshakeKeyPair(T txn, AuthorId local, PublicKey publicKey,
PrivateKey privateKey) throws DbException;
/**
* Marks the given message as permanent, i.e. not temporary.
*/
void setMessagePermanent(T txn, MessageId m) throws DbException;
/**
* Marks the given message as shared.
*/

View File

@@ -273,13 +273,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Override
public void addLocalMessage(Transaction transaction, Message m,
Metadata meta, boolean shared) throws DbException {
Metadata meta, boolean shared, boolean temporary)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsGroup(txn, m.getGroupId()))
throw new NoSuchGroupException();
if (!db.containsMessage(txn, m.getId())) {
db.addMessage(txn, m, DELIVERED, shared, null);
db.addMessage(txn, m, DELIVERED, shared, temporary, null);
transaction.attach(new MessageAddedEvent(m, null));
transaction.attach(new MessageStateChangedEvent(m.getId(), true,
DELIVERED));
@@ -800,7 +801,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.raiseSeenFlag(txn, c, m.getId());
db.raiseAckFlag(txn, c, m.getId());
} else {
db.addMessage(txn, m, UNKNOWN, false, c);
db.addMessage(txn, m, UNKNOWN, false, false, c);
transaction.attach(new MessageAddedEvent(m, c));
}
transaction.attach(new MessageToAckEvent(c));
@@ -908,6 +909,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new PendingContactRemovedEvent(p));
}
@Override
public void removeTemporaryMessages(Transaction transaction)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
db.removeTemporaryMessages(txn);
}
@Override
public void removeTransport(Transaction transaction, TransportId t)
throws DbException {
@@ -967,6 +976,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
}
@Override
public void setMessagePermanent(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
db.setMessagePermanent(txn, m);
}
@Override
public void setMessageShared(Transaction transaction, MessageId m)
throws DbException {
@@ -975,8 +994,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
if (db.getMessageState(txn, m) != DELIVERED)
throw new IllegalArgumentException(
"Shared undelivered message");
throw new IllegalArgumentException("Shared undelivered message");
db.setMessageShared(txn, m);
transaction.attach(new MessageSharedEvent(m));
}

View File

@@ -62,6 +62,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import static java.sql.Types.BINARY;
import static java.sql.Types.BOOLEAN;
@@ -97,7 +98,7 @@ import static org.briarproject.bramble.util.LogUtils.now;
abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing
static final int CODE_SCHEMA_VERSION = 45;
static final int CODE_SCHEMA_VERSION = 46;
// Time period offsets for incoming transport keys
private static final int OFFSET_PREV = -1;
@@ -177,6 +178,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " timestamp BIGINT NOT NULL,"
+ " state INT NOT NULL,"
+ " shared BOOLEAN NOT NULL,"
+ " temporary BOOLEAN NOT NULL,"
+ " length INT NOT NULL,"
+ " raw BLOB," // Null if message has been deleted
+ " PRIMARY KEY (messageId),"
@@ -336,25 +338,26 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final Logger LOG =
getLogger(JdbcDatabase.class.getName());
// Different database libraries use different names for certain types
private final MessageFactory messageFactory;
private final Clock clock;
private final DatabaseTypes dbTypes;
// Locking: connectionsLock
private final Lock connectionsLock = new ReentrantLock();
private final Condition connectionsChanged = connectionsLock.newCondition();
@GuardedBy("connectionsLock")
private final LinkedList<Connection> connections = new LinkedList<>();
private int openConnections = 0; // Locking: connectionsLock
private boolean closed = false; // Locking: connectionsLock
@GuardedBy("connectionsLock")
private int openConnections = 0;
@GuardedBy("connectionsLock")
private boolean closed = false;
protected abstract Connection createConnection()
throws DbException, SQLException;
protected abstract void compactAndClose() throws DbException;
private final Lock connectionsLock = new ReentrantLock();
private final Condition connectionsChanged = connectionsLock.newCondition();
JdbcDatabase(DatabaseTypes databaseTypes, MessageFactory messageFactory,
Clock clock) {
this.dbTypes = databaseTypes;
@@ -457,7 +460,8 @@ abstract class JdbcDatabase implements Database<Connection> {
new Migration41_42(dbTypes),
new Migration42_43(dbTypes),
new Migration43_44(dbTypes),
new Migration44_45()
new Migration44_45(),
new Migration45_46()
);
}
@@ -777,22 +781,23 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override
public void addMessage(Connection txn, Message m, MessageState state,
boolean messageShared, @Nullable ContactId sender)
boolean shared, boolean temporary, @Nullable ContactId sender)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO messages (messageId, groupId, timestamp,"
+ " state, shared, length, raw)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?)";
+ " state, shared, temporary, length, raw)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getId().getBytes());
ps.setBytes(2, m.getGroupId().getBytes());
ps.setLong(3, m.getTimestamp());
ps.setInt(4, state.getValue());
ps.setBoolean(5, messageShared);
ps.setBoolean(5, shared);
ps.setBoolean(6, temporary);
byte[] raw = messageFactory.getRawMessage(m);
ps.setInt(6, raw.length);
ps.setBytes(7, raw);
ps.setInt(7, raw.length);
ps.setBytes(8, raw);
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
@@ -804,8 +809,7 @@ abstract class JdbcDatabase implements Database<Connection> {
boolean offered = removeOfferedMessage(txn, c, m.getId());
boolean seen = offered || c.equals(sender);
addStatus(txn, m.getId(), c, m.getGroupId(), m.getTimestamp(),
raw.length, state, e.getValue(), messageShared,
false, seen);
raw.length, state, e.getValue(), shared, false, seen);
}
// Update denormalised column in messageDependencies if dependency
// is in same group as dependent
@@ -2876,6 +2880,21 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public void removeTemporaryMessages(Connection txn) throws DbException {
Statement s = null;
try {
String sql = "DELETE FROM messages WHERE temporary = TRUE";
s = txn.createStatement();
int affected = s.executeUpdate(sql);
if (affected < 0) throw new DbStateException();
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public void removeTransport(Connection txn, TransportId t)
throws DbException {
@@ -3021,6 +3040,24 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public void setMessagePermanent(Connection txn, MessageId m)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE messages SET temporary = FALSE"
+ " WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public void setMessageShared(Connection txn, MessageId m)
throws DbException {

View File

@@ -0,0 +1,41 @@
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
class Migration45_46 implements Migration<Connection> {
private static final Logger LOG = getLogger(Migration45_46.class.getName());
@Override
public int getStartVersion() {
return 45;
}
@Override
public int getEndVersion() {
return 46;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
try {
s = txn.createStatement();
s.execute("ALTER TABLE messages"
+ " ADD COLUMN temporary BOOLEAN NOT NULL"
+ " DEFAULT (FALSE)");
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
}

View File

@@ -243,7 +243,7 @@ class ClientVersioningManagerImpl implements ClientVersioningManager,
try {
Message m = clientHelper.createMessage(localGroup.getId(), now,
body);
db.addLocalMessage(txn, m, new Metadata(), false);
db.addLocalMessage(txn, m, new Metadata(), false, false);
} catch (FormatException e) {
throw new AssertionError(e);
}