Allow sync clients to defer delivery of messages.

This commit is contained in:
akwizgran
2021-05-31 17:12:07 +01:00
parent a5fb3bb4a4
commit 8a04d8edc4
19 changed files with 219 additions and 143 deletions

View File

@@ -44,6 +44,7 @@ import static org.briarproject.bramble.api.properties.TransportPropertyConstants
import static org.briarproject.bramble.api.properties.TransportPropertyConstants.MSG_KEY_TRANSPORT_ID;
import static org.briarproject.bramble.api.properties.TransportPropertyConstants.MSG_KEY_VERSION;
import static org.briarproject.bramble.api.properties.TransportPropertyConstants.REFLECTED_PROPERTY_PREFIX;
import static org.briarproject.bramble.api.sync.validation.IncomingMessageHook.DeliveryAction.ACCEPT_DO_NOT_SHARE;
import static org.briarproject.bramble.util.StringUtils.isNullOrEmpty;
@Immutable
@@ -115,8 +116,8 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
}
@Override
public boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException, InvalidMessageException {
public DeliveryAction incomingMessage(Transaction txn, Message m,
Metadata meta) throws DbException, InvalidMessageException {
try {
// Find the latest update for this transport, if any
BdfDictionary d = metadataParser.parse(meta);
@@ -131,14 +132,14 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
// We've already received a newer update - delete this one
db.deleteMessage(txn, m.getId());
db.deleteMessageMetadata(txn, m.getId());
return false;
return ACCEPT_DO_NOT_SHARE;
}
}
txn.attach(new RemoteTransportPropertiesUpdatedEvent(t));
} catch (FormatException e) {
throw new InvalidMessageException(e);
}
return false;
return ACCEPT_DO_NOT_SHARE;
}
@Override

View File

@@ -20,6 +20,7 @@ import org.briarproject.bramble.api.sync.MessageContext;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
import org.briarproject.bramble.api.sync.validation.IncomingMessageHook;
import org.briarproject.bramble.api.sync.validation.IncomingMessageHook.DeliveryAction;
import org.briarproject.bramble.api.sync.validation.MessageState;
import org.briarproject.bramble.api.sync.validation.MessageValidator;
import org.briarproject.bramble.api.sync.validation.ValidationManager;
@@ -40,6 +41,10 @@ import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.sync.validation.IncomingMessageHook.DeliveryAction.ACCEPT_DO_NOT_SHARE;
import static org.briarproject.bramble.api.sync.validation.IncomingMessageHook.DeliveryAction.ACCEPT_SHARE;
import static org.briarproject.bramble.api.sync.validation.IncomingMessageHook.DeliveryAction.DEFER;
import static org.briarproject.bramble.api.sync.validation.IncomingMessageHook.DeliveryAction.REJECT;
import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED;
import static org.briarproject.bramble.api.sync.validation.MessageState.INVALID;
import static org.briarproject.bramble.api.sync.validation.MessageState.PENDING;
@@ -185,16 +190,19 @@ class ValidationManagerImpl implements ValidationManager, Service,
int majorVersion = g.getMajorVersion();
Metadata meta =
db.getMessageMetadataForValidator(txn, id);
DeliveryResult result =
DeliveryAction action =
deliverMessage(txn, m, c, majorVersion, meta);
if (result.valid) {
addPendingDependents(txn, id, pending);
if (result.share) {
db.setMessageShared(txn, id);
toShare.addAll(states.keySet());
}
} else {
if (action == REJECT) {
invalidateMessage(txn, id);
addDependentsToInvalidate(txn, id, invalidate);
} else if (action == ACCEPT_SHARE) {
db.setMessageState(txn, m.getId(), DELIVERED);
addPendingDependents(txn, id, pending);
db.setMessageShared(txn, id);
toShare.addAll(states.keySet());
} else if (action == ACCEPT_DO_NOT_SHARE) {
db.setMessageState(txn, m.getId(), DELIVERED);
addPendingDependents(txn, id, pending);
}
}
}
@@ -275,16 +283,21 @@ class ValidationManagerImpl implements ValidationManager, Service,
Metadata meta = context.getMetadata();
db.mergeMessageMetadata(txn, id, meta);
if (allDelivered) {
DeliveryResult result =
DeliveryAction action =
deliverMessage(txn, m, c, majorVersion, meta);
if (result.valid) {
addPendingDependents(txn, id, pending);
if (result.share) {
db.setMessageShared(txn, id);
toShare.addAll(dependencies);
}
} else {
if (action == REJECT) {
invalidateMessage(txn, id);
addDependentsToInvalidate(txn, id, invalidate);
} else if (action == DEFER) {
db.setMessageState(txn, id, PENDING);
} else if (action == ACCEPT_SHARE) {
db.setMessageState(txn, id, DELIVERED);
addPendingDependents(txn, id, pending);
db.setMessageShared(txn, id);
toShare.addAll(dependencies);
} else if (action == ACCEPT_DO_NOT_SHARE) {
db.setMessageState(txn, id, DELIVERED);
addPendingDependents(txn, id, pending);
}
} else {
db.setMessageState(txn, id, PENDING);
@@ -304,23 +317,21 @@ class ValidationManagerImpl implements ValidationManager, Service,
}
@DatabaseExecutor
private DeliveryResult deliverMessage(Transaction txn, Message m,
ClientId c, int majorVersion, Metadata meta) throws DbException {
// Deliver the message to the client if it's registered a hook
boolean shareMsg = false;
private DeliveryAction deliverMessage(Transaction txn, Message m,
ClientId c, int majorVersion, Metadata meta) {
// Deliver the message to the client if it has registered a hook
ClientMajorVersion cv = new ClientMajorVersion(c, majorVersion);
IncomingMessageHook hook = hooks.get(cv);
if (hook != null) {
try {
shareMsg = hook.incomingMessage(txn, m, meta);
} catch (InvalidMessageException e) {
logException(LOG, INFO, e);
invalidateMessage(txn, m.getId());
return new DeliveryResult(false, false);
}
if (hook == null) return ACCEPT_DO_NOT_SHARE;
try {
return hook.incomingMessage(txn, m, meta);
} catch (DbException e) {
logException(LOG, INFO, e);
return DEFER;
} catch (InvalidMessageException e) {
logException(LOG, INFO, e);
return REJECT;
}
db.setMessageState(txn, m.getId(), DELIVERED);
return new DeliveryResult(true, shareMsg);
}
@DatabaseExecutor
@@ -447,14 +458,4 @@ class ValidationManagerImpl implements ValidationManager, Service,
logException(LOG, WARNING, e);
}
}
private static class DeliveryResult {
private final boolean valid, share;
private DeliveryResult(boolean valid, boolean share) {
this.valid = valid;
this.share = share;
}
}
}

View File

@@ -50,6 +50,7 @@ import static java.util.Collections.emptyList;
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
import static org.briarproject.bramble.api.sync.validation.IncomingMessageHook.DeliveryAction.ACCEPT_DO_NOT_SHARE;
import static org.briarproject.bramble.versioning.ClientVersioningConstants.MSG_KEY_LOCAL;
import static org.briarproject.bramble.versioning.ClientVersioningConstants.MSG_KEY_UPDATE_VERSION;
@@ -173,8 +174,8 @@ class ClientVersioningManagerImpl implements ClientVersioningManager,
}
@Override
public boolean incomingMessage(Transaction txn, Message m, Metadata meta)
throws DbException, InvalidMessageException {
public DeliveryAction incomingMessage(Transaction txn, Message m,
Metadata meta) throws DbException, InvalidMessageException {
try {
// Parse the new remote update
Update newRemoteUpdate = parseUpdate(clientHelper.toList(m));
@@ -187,7 +188,7 @@ class ClientVersioningManagerImpl implements ClientVersioningManager,
&& latest.remote.updateVersion > newRemoteUpdateVersion) {
db.deleteMessage(txn, m.getId());
db.deleteMessageMetadata(txn, m.getId());
return false;
return ACCEPT_DO_NOT_SHARE;
}
// Load and parse the latest local update
if (latest.local == null) throw new DbException();
@@ -241,7 +242,7 @@ class ClientVersioningManagerImpl implements ClientVersioningManager,
} catch (FormatException e) {
throw new InvalidMessageException(e);
}
return false;
return ACCEPT_DO_NOT_SHARE;
}
private void storeClientVersions(Transaction txn,