mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-19 14:19:53 +01:00
Merge branch '2225-error-handling-for-mailbox-downloads' into 'master'
Add methods to get StreamContext from tag, and mark it as recognised Closes #2225 See merge request briar/briar!1560
This commit is contained in:
@@ -113,9 +113,25 @@ public interface KeyManager {
|
|||||||
/**
|
/**
|
||||||
* Looks up the given tag and returns a {@link StreamContext} for reading
|
* Looks up the given tag and returns a {@link StreamContext} for reading
|
||||||
* from the corresponding stream, or null if an error occurs or the tag was
|
* from the corresponding stream, or null if an error occurs or the tag was
|
||||||
* unexpected.
|
* unexpected. Marks the tag as recognised and updates the reordering
|
||||||
|
* window.
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
StreamContext getStreamContext(TransportId t, byte[] tag)
|
StreamContext getStreamContext(TransportId t, byte[] tag)
|
||||||
throws DbException;
|
throws DbException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Looks up the given tag and returns a {@link StreamContext} for reading
|
||||||
|
* from the corresponding stream, or null if an error occurs or the tag was
|
||||||
|
* unexpected. Only returns the StreamContext; does not mark the tag as
|
||||||
|
* recognised.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
StreamContext getStreamContextOnly(TransportId t, byte[] tag)
|
||||||
|
throws DbException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks the tag as recognised and updates the reordering window.
|
||||||
|
*/
|
||||||
|
void markTagAsRecognised(TransportId t, byte[] tag) throws DbException;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -215,6 +215,23 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
|
|||||||
m.getStreamContext(txn, tag)));
|
m.getStreamContext(txn, tag)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StreamContext getStreamContextOnly(TransportId t, byte[] tag)
|
||||||
|
throws DbException {
|
||||||
|
return withManager(t, m ->
|
||||||
|
db.transactionWithNullableResult(false, txn ->
|
||||||
|
m.getStreamContextOnly(txn, tag)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void markTagAsRecognised(TransportId t, byte[] tag)
|
||||||
|
throws DbException {
|
||||||
|
withManager(t, m -> {
|
||||||
|
db.transaction(false, txn -> m.markTagAsRecognised(txn, tag));
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void eventOccurred(Event e) {
|
public void eventOccurred(Event e) {
|
||||||
if (e instanceof ContactRemovedEvent) {
|
if (e instanceof ContactRemovedEvent) {
|
||||||
|
|||||||
@@ -48,4 +48,9 @@ interface TransportKeyManager {
|
|||||||
StreamContext getStreamContext(Transaction txn, byte[] tag)
|
StreamContext getStreamContext(Transaction txn, byte[] tag)
|
||||||
throws DbException;
|
throws DbException;
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
StreamContext getStreamContextOnly(Transaction txn, byte[] tag);
|
||||||
|
|
||||||
|
void markTagAsRecognised(Transaction txn, byte[] tag) throws DbException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -393,56 +393,82 @@ class TransportKeyManagerImpl implements TransportKeyManager {
|
|||||||
throws DbException {
|
throws DbException {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
// Look up the incoming keys for the tag
|
StreamContext ctx = streamContextFromTag(tag);
|
||||||
TagContext tagCtx = inContexts.remove(new Bytes(tag));
|
if (ctx == null) return null;
|
||||||
if (tagCtx == null) return null;
|
markTagAsRecognised(txn, tag);
|
||||||
MutableIncomingKeys inKeys = tagCtx.inKeys;
|
|
||||||
// Create a stream context
|
|
||||||
StreamContext ctx = new StreamContext(tagCtx.contactId,
|
|
||||||
tagCtx.pendingContactId, transportId,
|
|
||||||
inKeys.getTagKey(), inKeys.getHeaderKey(),
|
|
||||||
tagCtx.streamNumber, tagCtx.handshakeMode);
|
|
||||||
// Update the reordering window
|
|
||||||
ReorderingWindow window = inKeys.getWindow();
|
|
||||||
Change change = window.setSeen(tagCtx.streamNumber);
|
|
||||||
// Add tags for any stream numbers added to the window
|
|
||||||
for (long streamNumber : change.getAdded()) {
|
|
||||||
byte[] addTag = new byte[TAG_LENGTH];
|
|
||||||
transportCrypto.encodeTag(addTag, inKeys.getTagKey(),
|
|
||||||
PROTOCOL_VERSION, streamNumber);
|
|
||||||
TagContext tagCtx1 = new TagContext(tagCtx.keySetId,
|
|
||||||
tagCtx.contactId, tagCtx.pendingContactId, inKeys,
|
|
||||||
streamNumber, tagCtx.handshakeMode);
|
|
||||||
inContexts.put(new Bytes(addTag), tagCtx1);
|
|
||||||
}
|
|
||||||
// Remove tags for any stream numbers removed from the window
|
|
||||||
for (long streamNumber : change.getRemoved()) {
|
|
||||||
if (streamNumber == tagCtx.streamNumber) continue;
|
|
||||||
byte[] removeTag = new byte[TAG_LENGTH];
|
|
||||||
transportCrypto.encodeTag(removeTag, inKeys.getTagKey(),
|
|
||||||
PROTOCOL_VERSION, streamNumber);
|
|
||||||
inContexts.remove(new Bytes(removeTag));
|
|
||||||
}
|
|
||||||
// Write the window back to the DB
|
|
||||||
db.setReorderingWindow(txn, tagCtx.keySetId, transportId,
|
|
||||||
inKeys.getTimePeriod(), window.getBase(),
|
|
||||||
window.getBitmap());
|
|
||||||
// If the outgoing keys are inactive, activate them
|
|
||||||
MutableTransportKeySet ks = keys.get(tagCtx.keySetId);
|
|
||||||
MutableOutgoingKeys outKeys =
|
|
||||||
ks.getKeys().getCurrentOutgoingKeys();
|
|
||||||
if (!outKeys.isActive()) {
|
|
||||||
LOG.info("Activating outgoing keys");
|
|
||||||
outKeys.activate();
|
|
||||||
considerReplacingOutgoingKeys(ks);
|
|
||||||
db.setTransportKeysActive(txn, transportId, tagCtx.keySetId);
|
|
||||||
}
|
|
||||||
return ctx;
|
return ctx;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StreamContext getStreamContextOnly(Transaction txn, byte[] tag) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
return streamContextFromTag(tag);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@GuardedBy("lock")
|
||||||
|
@Nullable
|
||||||
|
private StreamContext streamContextFromTag(byte[] tag) {
|
||||||
|
// Look up the incoming keys for the tag
|
||||||
|
TagContext tagCtx = inContexts.get(new Bytes(tag));
|
||||||
|
if (tagCtx == null) return null;
|
||||||
|
MutableIncomingKeys inKeys = tagCtx.inKeys;
|
||||||
|
// Create a stream context
|
||||||
|
return new StreamContext(tagCtx.contactId,
|
||||||
|
tagCtx.pendingContactId, transportId,
|
||||||
|
inKeys.getTagKey(), inKeys.getHeaderKey(),
|
||||||
|
tagCtx.streamNumber, tagCtx.handshakeMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void markTagAsRecognised(Transaction txn, byte[] tag)
|
||||||
|
throws DbException {
|
||||||
|
TagContext tagCtx = inContexts.remove(new Bytes(tag));
|
||||||
|
if (tagCtx == null) return;
|
||||||
|
MutableIncomingKeys inKeys = tagCtx.inKeys;
|
||||||
|
// Update the reordering window
|
||||||
|
ReorderingWindow window = inKeys.getWindow();
|
||||||
|
Change change = window.setSeen(tagCtx.streamNumber);
|
||||||
|
// Add tags for any stream numbers added to the window
|
||||||
|
for (long streamNumber : change.getAdded()) {
|
||||||
|
byte[] addTag = new byte[TAG_LENGTH];
|
||||||
|
transportCrypto.encodeTag(addTag, inKeys.getTagKey(),
|
||||||
|
PROTOCOL_VERSION, streamNumber);
|
||||||
|
TagContext tagCtx1 = new TagContext(tagCtx.keySetId,
|
||||||
|
tagCtx.contactId, tagCtx.pendingContactId, inKeys,
|
||||||
|
streamNumber, tagCtx.handshakeMode);
|
||||||
|
inContexts.put(new Bytes(addTag), tagCtx1);
|
||||||
|
}
|
||||||
|
// Remove tags for any stream numbers removed from the window
|
||||||
|
for (long streamNumber : change.getRemoved()) {
|
||||||
|
if (streamNumber == tagCtx.streamNumber) continue;
|
||||||
|
byte[] removeTag = new byte[TAG_LENGTH];
|
||||||
|
transportCrypto.encodeTag(removeTag, inKeys.getTagKey(),
|
||||||
|
PROTOCOL_VERSION, streamNumber);
|
||||||
|
inContexts.remove(new Bytes(removeTag));
|
||||||
|
}
|
||||||
|
// Write the window back to the DB
|
||||||
|
db.setReorderingWindow(txn, tagCtx.keySetId, transportId,
|
||||||
|
inKeys.getTimePeriod(), window.getBase(),
|
||||||
|
window.getBitmap());
|
||||||
|
// If the outgoing keys are inactive, activate them
|
||||||
|
MutableTransportKeySet ks = keys.get(tagCtx.keySetId);
|
||||||
|
MutableOutgoingKeys outKeys =
|
||||||
|
ks.getKeys().getCurrentOutgoingKeys();
|
||||||
|
if (!outKeys.isActive()) {
|
||||||
|
LOG.info("Activating outgoing keys");
|
||||||
|
outKeys.activate();
|
||||||
|
considerReplacingOutgoingKeys(ks);
|
||||||
|
db.setTransportKeysActive(txn, transportId, tagCtx.keySetId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@DatabaseExecutor
|
@DatabaseExecutor
|
||||||
@Wakeful
|
@Wakeful
|
||||||
private void updateKeys(Transaction txn) throws DbException {
|
private void updateKeys(Transaction txn) throws DbException {
|
||||||
|
|||||||
@@ -393,6 +393,76 @@ public class TransportKeyManagerImplTest extends BrambleMockTestCase {
|
|||||||
assertNull(transportKeyManager.getStreamContext(txn, tag));
|
assertNull(transportKeyManager.getStreamContext(txn, tag));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetStreamContextOnlyAndMarkTag() throws Exception {
|
||||||
|
boolean alice = random.nextBoolean();
|
||||||
|
TransportKeys transportKeys = createTransportKeys(1000, 0, true);
|
||||||
|
Transaction txn = new Transaction(null, false);
|
||||||
|
|
||||||
|
// Keep a copy of the tags
|
||||||
|
List<byte[]> tags = new ArrayList<>();
|
||||||
|
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
oneOf(transportCrypto).deriveRotationKeys(transportId, rootKey,
|
||||||
|
1000, alice, true);
|
||||||
|
will(returnValue(transportKeys));
|
||||||
|
// Get the current time (the start of time period 1000)
|
||||||
|
oneOf(clock).currentTimeMillis();
|
||||||
|
will(returnValue(timePeriodLength * 1000));
|
||||||
|
// Encode the tags (3 sets)
|
||||||
|
for (long i = 0; i < REORDERING_WINDOW_SIZE; i++) {
|
||||||
|
exactly(3).of(transportCrypto).encodeTag(
|
||||||
|
with(any(byte[].class)), with(tagKey),
|
||||||
|
with(PROTOCOL_VERSION), with(i));
|
||||||
|
will(new EncodeTagAction(tags));
|
||||||
|
}
|
||||||
|
// Updated the transport keys (the keys are unaffected)
|
||||||
|
oneOf(transportCrypto).updateTransportKeys(transportKeys, 1000);
|
||||||
|
will(returnValue(transportKeys));
|
||||||
|
// Save the keys
|
||||||
|
oneOf(db).addTransportKeys(txn, contactId, transportKeys);
|
||||||
|
will(returnValue(keySetId));
|
||||||
|
// Encode a new tag after sliding the window
|
||||||
|
oneOf(transportCrypto).encodeTag(with(any(byte[].class)),
|
||||||
|
with(tagKey), with(PROTOCOL_VERSION),
|
||||||
|
with((long) REORDERING_WINDOW_SIZE));
|
||||||
|
will(new EncodeTagAction(tags));
|
||||||
|
// Save the reordering window (previous time period, base 1)
|
||||||
|
oneOf(db).setReorderingWindow(txn, keySetId, transportId, 999,
|
||||||
|
1, new byte[REORDERING_WINDOW_SIZE / 8]);
|
||||||
|
}});
|
||||||
|
|
||||||
|
// The timestamp is at the start of time period 1000
|
||||||
|
long timestamp = timePeriodLength * 1000;
|
||||||
|
assertEquals(keySetId, transportKeyManager.addRotationKeys(
|
||||||
|
txn, contactId, rootKey, timestamp, alice, true));
|
||||||
|
assertTrue(transportKeyManager.canSendOutgoingStreams(contactId));
|
||||||
|
// Use the first tag (previous time period, stream number 0)
|
||||||
|
assertEquals(REORDERING_WINDOW_SIZE * 3, tags.size());
|
||||||
|
byte[] tag = tags.get(0);
|
||||||
|
// Repeated request should return same stream context
|
||||||
|
StreamContext ctx = transportKeyManager.getStreamContextOnly(txn, tag);
|
||||||
|
assertNotNull(ctx);
|
||||||
|
assertEquals(contactId, ctx.getContactId());
|
||||||
|
assertEquals(transportId, ctx.getTransportId());
|
||||||
|
assertEquals(tagKey, ctx.getTagKey());
|
||||||
|
assertEquals(headerKey, ctx.getHeaderKey());
|
||||||
|
assertEquals(0L, ctx.getStreamNumber());
|
||||||
|
ctx = transportKeyManager.getStreamContextOnly(txn, tag);
|
||||||
|
assertNotNull(ctx);
|
||||||
|
assertEquals(contactId, ctx.getContactId());
|
||||||
|
assertEquals(transportId, ctx.getTransportId());
|
||||||
|
assertEquals(tagKey, ctx.getTagKey());
|
||||||
|
assertEquals(headerKey, ctx.getHeaderKey());
|
||||||
|
assertEquals(0L, ctx.getStreamNumber());
|
||||||
|
// Then mark tag as recognised
|
||||||
|
transportKeyManager.markTagAsRecognised(txn, tag);
|
||||||
|
// Another tag should have been encoded
|
||||||
|
assertEquals(REORDERING_WINDOW_SIZE * 3 + 1, tags.size());
|
||||||
|
// Finally ensure the used tag is not recognised again
|
||||||
|
assertNull(transportKeyManager.getStreamContextOnly(txn, tag));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testKeysAreUpdatedToCurrentPeriod() throws Exception {
|
public void testKeysAreUpdatedToCurrentPeriod() throws Exception {
|
||||||
TransportKeys transportKeys = createTransportKeys(1000, 0, true);
|
TransportKeys transportKeys = createTransportKeys(1000, 0, true);
|
||||||
|
|||||||
Reference in New Issue
Block a user