Add methods to get StreamContext from tag, and mark it as recognised

Separate methods are needed to be able to restart reading from a stream
in the case of errors. Tag should be marked as recognised only after
successfully reading the stream.

Closes #2225
This commit is contained in:
Daniel Lublin
2021-12-10 13:16:08 +01:00
parent 0a98566298
commit f8b3d79813
5 changed files with 179 additions and 45 deletions

View File

@@ -113,9 +113,25 @@ public interface KeyManager {
/**
* Looks up the given tag and returns a {@link StreamContext} for reading
* from the corresponding stream, or null if an error occurs or the tag was
* unexpected.
* unexpected. Marks the tag as recognised and updates the reordering
* window.
*/
@Nullable
StreamContext getStreamContext(TransportId t, byte[] tag)
throws DbException;
/**
* Looks up the given tag and returns a {@link StreamContext} for reading
* from the corresponding stream, or null if an error occurs or the tag was
* unexpected. Only returns the StreamContext; does not mark the tag as
* recognised.
*/
@Nullable
StreamContext getStreamContextOnly(TransportId t, byte[] tag)
throws DbException;
/**
* Marks the tag as recognised and updates the reordering window.
*/
void markTagAsRecognised(TransportId t, byte[] tag) throws DbException;
}

View File

@@ -215,6 +215,23 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
m.getStreamContext(txn, tag)));
}
@Override
public StreamContext getStreamContextOnly(TransportId t, byte[] tag)
throws DbException {
return withManager(t, m ->
db.transactionWithNullableResult(false, txn ->
m.getStreamContextOnly(txn, tag)));
}
@Override
public void markTagAsRecognised(TransportId t, byte[] tag)
throws DbException {
withManager(t, m -> {
db.transaction(false, txn -> m.markTagAsRecognised(txn, tag));
return null;
});
}
@Override
public void eventOccurred(Event e) {
if (e instanceof ContactRemovedEvent) {

View File

@@ -48,4 +48,9 @@ interface TransportKeyManager {
StreamContext getStreamContext(Transaction txn, byte[] tag)
throws DbException;
@Nullable
StreamContext getStreamContextOnly(Transaction txn, byte[] tag);
void markTagAsRecognised(Transaction txn, byte[] tag) throws DbException;
}

View File

@@ -393,56 +393,82 @@ class TransportKeyManagerImpl implements TransportKeyManager {
throws DbException {
lock.lock();
try {
// Look up the incoming keys for the tag
TagContext tagCtx = inContexts.remove(new Bytes(tag));
if (tagCtx == null) return null;
MutableIncomingKeys inKeys = tagCtx.inKeys;
// Create a stream context
StreamContext ctx = new StreamContext(tagCtx.contactId,
tagCtx.pendingContactId, transportId,
inKeys.getTagKey(), inKeys.getHeaderKey(),
tagCtx.streamNumber, tagCtx.handshakeMode);
// Update the reordering window
ReorderingWindow window = inKeys.getWindow();
Change change = window.setSeen(tagCtx.streamNumber);
// Add tags for any stream numbers added to the window
for (long streamNumber : change.getAdded()) {
byte[] addTag = new byte[TAG_LENGTH];
transportCrypto.encodeTag(addTag, inKeys.getTagKey(),
PROTOCOL_VERSION, streamNumber);
TagContext tagCtx1 = new TagContext(tagCtx.keySetId,
tagCtx.contactId, tagCtx.pendingContactId, inKeys,
streamNumber, tagCtx.handshakeMode);
inContexts.put(new Bytes(addTag), tagCtx1);
}
// Remove tags for any stream numbers removed from the window
for (long streamNumber : change.getRemoved()) {
if (streamNumber == tagCtx.streamNumber) continue;
byte[] removeTag = new byte[TAG_LENGTH];
transportCrypto.encodeTag(removeTag, inKeys.getTagKey(),
PROTOCOL_VERSION, streamNumber);
inContexts.remove(new Bytes(removeTag));
}
// Write the window back to the DB
db.setReorderingWindow(txn, tagCtx.keySetId, transportId,
inKeys.getTimePeriod(), window.getBase(),
window.getBitmap());
// If the outgoing keys are inactive, activate them
MutableTransportKeySet ks = keys.get(tagCtx.keySetId);
MutableOutgoingKeys outKeys =
ks.getKeys().getCurrentOutgoingKeys();
if (!outKeys.isActive()) {
LOG.info("Activating outgoing keys");
outKeys.activate();
considerReplacingOutgoingKeys(ks);
db.setTransportKeysActive(txn, transportId, tagCtx.keySetId);
}
StreamContext ctx = streamContextFromTag(tag);
if (ctx == null) return null;
markTagAsRecognised(txn, tag);
return ctx;
} finally {
lock.unlock();
}
}
@Override
public StreamContext getStreamContextOnly(Transaction txn, byte[] tag) {
lock.lock();
try {
return streamContextFromTag(tag);
} finally {
lock.unlock();
}
}
@GuardedBy("lock")
@Nullable
private StreamContext streamContextFromTag(byte[] tag) {
// Look up the incoming keys for the tag
TagContext tagCtx = inContexts.get(new Bytes(tag));
if (tagCtx == null) return null;
MutableIncomingKeys inKeys = tagCtx.inKeys;
// Create a stream context
return new StreamContext(tagCtx.contactId,
tagCtx.pendingContactId, transportId,
inKeys.getTagKey(), inKeys.getHeaderKey(),
tagCtx.streamNumber, tagCtx.handshakeMode);
}
@Override
public void markTagAsRecognised(Transaction txn, byte[] tag)
throws DbException {
TagContext tagCtx = inContexts.remove(new Bytes(tag));
if (tagCtx == null) return;
MutableIncomingKeys inKeys = tagCtx.inKeys;
// Update the reordering window
ReorderingWindow window = inKeys.getWindow();
Change change = window.setSeen(tagCtx.streamNumber);
// Add tags for any stream numbers added to the window
for (long streamNumber : change.getAdded()) {
byte[] addTag = new byte[TAG_LENGTH];
transportCrypto.encodeTag(addTag, inKeys.getTagKey(),
PROTOCOL_VERSION, streamNumber);
TagContext tagCtx1 = new TagContext(tagCtx.keySetId,
tagCtx.contactId, tagCtx.pendingContactId, inKeys,
streamNumber, tagCtx.handshakeMode);
inContexts.put(new Bytes(addTag), tagCtx1);
}
// Remove tags for any stream numbers removed from the window
for (long streamNumber : change.getRemoved()) {
if (streamNumber == tagCtx.streamNumber) continue;
byte[] removeTag = new byte[TAG_LENGTH];
transportCrypto.encodeTag(removeTag, inKeys.getTagKey(),
PROTOCOL_VERSION, streamNumber);
inContexts.remove(new Bytes(removeTag));
}
// Write the window back to the DB
db.setReorderingWindow(txn, tagCtx.keySetId, transportId,
inKeys.getTimePeriod(), window.getBase(),
window.getBitmap());
// If the outgoing keys are inactive, activate them
MutableTransportKeySet ks = keys.get(tagCtx.keySetId);
MutableOutgoingKeys outKeys =
ks.getKeys().getCurrentOutgoingKeys();
if (!outKeys.isActive()) {
LOG.info("Activating outgoing keys");
outKeys.activate();
considerReplacingOutgoingKeys(ks);
db.setTransportKeysActive(txn, transportId, tagCtx.keySetId);
}
}
@DatabaseExecutor
@Wakeful
private void updateKeys(Transaction txn) throws DbException {

View File

@@ -393,6 +393,76 @@ public class TransportKeyManagerImplTest extends BrambleMockTestCase {
assertNull(transportKeyManager.getStreamContext(txn, tag));
}
@Test
public void testGetStreamContextOnlyAndMarkTag() throws Exception {
boolean alice = random.nextBoolean();
TransportKeys transportKeys = createTransportKeys(1000, 0, true);
Transaction txn = new Transaction(null, false);
// Keep a copy of the tags
List<byte[]> tags = new ArrayList<>();
context.checking(new Expectations() {{
oneOf(transportCrypto).deriveRotationKeys(transportId, rootKey,
1000, alice, true);
will(returnValue(transportKeys));
// Get the current time (the start of time period 1000)
oneOf(clock).currentTimeMillis();
will(returnValue(timePeriodLength * 1000));
// Encode the tags (3 sets)
for (long i = 0; i < REORDERING_WINDOW_SIZE; i++) {
exactly(3).of(transportCrypto).encodeTag(
with(any(byte[].class)), with(tagKey),
with(PROTOCOL_VERSION), with(i));
will(new EncodeTagAction(tags));
}
// Updated the transport keys (the keys are unaffected)
oneOf(transportCrypto).updateTransportKeys(transportKeys, 1000);
will(returnValue(transportKeys));
// Save the keys
oneOf(db).addTransportKeys(txn, contactId, transportKeys);
will(returnValue(keySetId));
// Encode a new tag after sliding the window
oneOf(transportCrypto).encodeTag(with(any(byte[].class)),
with(tagKey), with(PROTOCOL_VERSION),
with((long) REORDERING_WINDOW_SIZE));
will(new EncodeTagAction(tags));
// Save the reordering window (previous time period, base 1)
oneOf(db).setReorderingWindow(txn, keySetId, transportId, 999,
1, new byte[REORDERING_WINDOW_SIZE / 8]);
}});
// The timestamp is at the start of time period 1000
long timestamp = timePeriodLength * 1000;
assertEquals(keySetId, transportKeyManager.addRotationKeys(
txn, contactId, rootKey, timestamp, alice, true));
assertTrue(transportKeyManager.canSendOutgoingStreams(contactId));
// Use the first tag (previous time period, stream number 0)
assertEquals(REORDERING_WINDOW_SIZE * 3, tags.size());
byte[] tag = tags.get(0);
// Repeated request should return same stream context
StreamContext ctx = transportKeyManager.getStreamContextOnly(txn, tag);
assertNotNull(ctx);
assertEquals(contactId, ctx.getContactId());
assertEquals(transportId, ctx.getTransportId());
assertEquals(tagKey, ctx.getTagKey());
assertEquals(headerKey, ctx.getHeaderKey());
assertEquals(0L, ctx.getStreamNumber());
ctx = transportKeyManager.getStreamContextOnly(txn, tag);
assertNotNull(ctx);
assertEquals(contactId, ctx.getContactId());
assertEquals(transportId, ctx.getTransportId());
assertEquals(tagKey, ctx.getTagKey());
assertEquals(headerKey, ctx.getHeaderKey());
assertEquals(0L, ctx.getStreamNumber());
// Then mark tag as recognised
transportKeyManager.markTagAsRecognised(txn, tag);
// Another tag should have been encoded
assertEquals(REORDERING_WINDOW_SIZE * 3 + 1, tags.size());
// Finally ensure the used tag is not recognised again
assertNull(transportKeyManager.getStreamContextOnly(txn, tag));
}
@Test
public void testKeysAreUpdatedToCurrentPeriod() throws Exception {
TransportKeys transportKeys = createTransportKeys(1000, 0, true);