Don't allow multiple messages to be queued in memory for validation at startup.

This commit is contained in:
akwizgran
2020-09-24 15:42:18 +01:00
parent 4e5f2e31df
commit 327e12e9f8

View File

@@ -35,6 +35,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject; import javax.inject.Inject;
@@ -124,8 +125,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
Group g = db.getGroup(txn, m.getGroupId()); Group g = db.getGroup(txn, m.getGroupId());
return new Pair<>(m, g); return new Pair<>(m, g);
}); });
validateMessageAsync(mg.getFirst(), mg.getSecond()); validateMessageAsync(mg.getFirst(), mg.getSecond(), unvalidated);
validateNextMessageAsync(unvalidated);
} catch (NoSuchMessageException e) { } catch (NoSuchMessageException e) {
LOG.info("Message removed before validation"); LOG.info("Message removed before validation");
validateNextMessageAsync(unvalidated); validateNextMessageAsync(unvalidated);
@@ -213,12 +213,14 @@ class ValidationManagerImpl implements ValidationManager, Service,
} }
} }
private void validateMessageAsync(Message m, Group g) { private void validateMessageAsync(Message m, Group g,
validationExecutor.execute(() -> validateMessage(m, g)); @Nullable Queue<MessageId> unvalidated) {
validationExecutor.execute(() -> validateMessage(m, g, unvalidated));
} }
@ValidationExecutor @ValidationExecutor
private void validateMessage(Message m, Group g) { private void validateMessage(Message m, Group g,
@Nullable Queue<MessageId> unvalidated) {
ClientMajorVersion cv = ClientMajorVersion cv =
new ClientMajorVersion(g.getClientId(), g.getMajorVersion()); new ClientMajorVersion(g.getClientId(), g.getMajorVersion());
MessageValidator v = validators.get(cv); MessageValidator v = validators.get(cv);
@@ -234,6 +236,8 @@ class ValidationManagerImpl implements ValidationManager, Service,
Queue<MessageId> invalidate = new LinkedList<>(); Queue<MessageId> invalidate = new LinkedList<>();
invalidate.add(m.getId()); invalidate.add(m.getId());
invalidateNextMessageAsync(invalidate); invalidateNextMessageAsync(invalidate);
} finally {
if (unvalidated != null) validateNextMessageAsync(unvalidated);
} }
} }
} }
@@ -440,7 +444,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
try { try {
Group g = db.transactionWithResult(true, txn -> Group g = db.transactionWithResult(true, txn ->
db.getGroup(txn, m.getGroupId())); db.getGroup(txn, m.getGroupId()));
validateMessageAsync(m, g); validateMessageAsync(m, g, null);
} catch (NoSuchGroupException e) { } catch (NoSuchGroupException e) {
LOG.info("Group removed before validation"); LOG.info("Group removed before validation");
} catch (DbException e) { } catch (DbException e) {