Remove client ID from validator's DB methods.

This commit is contained in:
akwizgran
2018-03-09 12:28:48 +00:00
parent 85cc23444c
commit 5fe68e6f82
8 changed files with 82 additions and 94 deletions

View File

@@ -424,31 +424,27 @@ interface Database<T> {
throws DbException;
/**
* Returns the IDs of any messages that need to be validated by the given
* client.
* Returns the IDs of any messages that need to be validated.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToValidate(T txn, ClientId c)
throws DbException;
Collection<MessageId> getMessagesToValidate(T txn) throws DbException;
/**
* Returns the IDs of any messages that are still pending due to
* dependencies to other messages for the given client.
* Returns the IDs of any messages that are pending delivery due to
* dependencies on other messages.
* <p/>
* Read-only.
*/
Collection<MessageId> getPendingMessages(T txn, ClientId c)
throws DbException;
Collection<MessageId> getPendingMessages(T txn) throws DbException;
/**
* Returns the IDs of any messages from the given client
* that have a shared dependent, but are still not shared themselves.
* Returns the IDs of any messages that have a shared dependent but have
* not yet been shared themselves.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToShare(T txn, ClientId c)
throws DbException;
Collection<MessageId> getMessagesToShare(T txn) throws DbException;
/**
* Returns the next time (in milliseconds since the Unix epoch) when a

View File

@@ -455,24 +455,24 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Override
public Collection<MessageId> getMessagesToValidate(Transaction transaction,
ClientId c) throws DbException {
public Collection<MessageId> getMessagesToValidate(Transaction transaction)
throws DbException {
T txn = unbox(transaction);
return db.getMessagesToValidate(txn, c);
return db.getMessagesToValidate(txn);
}
@Override
public Collection<MessageId> getPendingMessages(Transaction transaction,
ClientId c) throws DbException {
public Collection<MessageId> getPendingMessages(Transaction transaction)
throws DbException {
T txn = unbox(transaction);
return db.getPendingMessages(txn, c);
return db.getPendingMessages(txn);
}
@Override
public Collection<MessageId> getMessagesToShare(
Transaction transaction, ClientId c) throws DbException {
public Collection<MessageId> getMessagesToShare(Transaction transaction)
throws DbException {
T txn = unbox(transaction);
return db.getMessagesToShare(txn, c);
return db.getMessagesToShare(txn);
}
@Nullable
@@ -573,7 +573,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Override
public long getNextSendTime(Transaction transaction, ContactId c)
throws DbException {
throws DbException {
T txn = unbox(transaction);
return db.getNextSendTime(txn, c);
}

View File

@@ -1868,28 +1868,26 @@ abstract class JdbcDatabase implements Database<Connection> {
}
@Override
public Collection<MessageId> getMessagesToValidate(Connection txn,
ClientId c) throws DbException {
return getMessagesInState(txn, c, UNKNOWN);
public Collection<MessageId> getMessagesToValidate(Connection txn)
throws DbException {
return getMessagesInState(txn, UNKNOWN);
}
@Override
public Collection<MessageId> getPendingMessages(Connection txn,
ClientId c) throws DbException {
return getMessagesInState(txn, c, PENDING);
public Collection<MessageId> getPendingMessages(Connection txn)
throws DbException {
return getMessagesInState(txn, PENDING);
}
private Collection<MessageId> getMessagesInState(Connection txn, ClientId c,
private Collection<MessageId> getMessagesInState(Connection txn,
State state) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT messageId FROM messages AS m"
+ " JOIN groups AS g ON m.groupId = g.groupId"
+ " WHERE state = ? AND clientId = ? AND raw IS NOT NULL";
String sql = "SELECT messageId FROM messages"
+ " WHERE state = ? AND raw IS NOT NULL";
ps = txn.prepareStatement(sql);
ps.setInt(1, state.getValue());
ps.setString(2, c.getString());
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
@@ -1904,7 +1902,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
@Override
public Collection<MessageId> getMessagesToShare(Connection txn, ClientId c)
public Collection<MessageId> getMessagesToShare(Connection txn)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -1914,12 +1912,10 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " ON m.messageId = d.dependencyId"
+ " JOIN messages AS m1"
+ " ON d.messageId = m1.messageId"
+ " JOIN groups AS g"
+ " ON m.groupId = g.groupId"
+ " WHERE m.shared = FALSE AND m1.shared = TRUE"
+ " AND g.clientId = ?";
+ " WHERE m.state = ?"
+ " AND m.shared = FALSE AND m1.shared = TRUE";
ps = txn.prepareStatement(sql);
ps.setString(1, c.getString());
ps.setInt(1, DELIVERED.getValue());
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));

View File

@@ -71,11 +71,9 @@ class ValidationManagerImpl implements ValidationManager, Service,
@Override
public void startService() {
if (used.getAndSet(true)) throw new IllegalStateException();
for (ClientId c : validators.keySet()) {
validateOutstandingMessagesAsync(c);
deliverOutstandingMessagesAsync(c);
shareOutstandingMessagesAsync(c);
}
validateOutstandingMessagesAsync();
deliverOutstandingMessagesAsync();
shareOutstandingMessagesAsync();
}
@Override
@@ -93,17 +91,17 @@ class ValidationManagerImpl implements ValidationManager, Service,
hooks.put(c, hook);
}
private void validateOutstandingMessagesAsync(ClientId c) {
dbExecutor.execute(() -> validateOutstandingMessages(c));
private void validateOutstandingMessagesAsync() {
dbExecutor.execute(this::validateOutstandingMessages);
}
@DatabaseExecutor
private void validateOutstandingMessages(ClientId c) {
private void validateOutstandingMessages() {
try {
Queue<MessageId> unvalidated = new LinkedList<>();
Transaction txn = db.startTransaction(true);
try {
unvalidated.addAll(db.getMessagesToValidate(txn, c));
unvalidated.addAll(db.getMessagesToValidate(txn));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
@@ -148,17 +146,17 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
}
private void deliverOutstandingMessagesAsync(ClientId c) {
dbExecutor.execute(() -> deliverOutstandingMessages(c));
private void deliverOutstandingMessagesAsync() {
dbExecutor.execute(this::deliverOutstandingMessages);
}
@DatabaseExecutor
private void deliverOutstandingMessages(ClientId c) {
private void deliverOutstandingMessages() {
try {
Queue<MessageId> pending = new LinkedList<>();
Transaction txn = db.startTransaction(true);
try {
pending.addAll(db.getPendingMessages(txn, c));
pending.addAll(db.getPendingMessages(txn));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
@@ -353,17 +351,17 @@ class ValidationManagerImpl implements ValidationManager, Service,
return pending;
}
private void shareOutstandingMessagesAsync(ClientId c) {
dbExecutor.execute(() -> shareOutstandingMessages(c));
private void shareOutstandingMessagesAsync() {
dbExecutor.execute(this::shareOutstandingMessages);
}
@DatabaseExecutor
private void shareOutstandingMessages(ClientId c) {
private void shareOutstandingMessages() {
try {
Queue<MessageId> toShare = new LinkedList<>();
Transaction txn = db.startTransaction(true);
try {
toShare.addAll(db.getMessagesToShare(txn, c));
toShare.addAll(db.getMessagesToShare(txn));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);