Move pending contact events to rendezvous poller.

This commit is contained in:
akwizgran
2019-06-06 16:39:54 +01:00
parent 15d9ff1ebd
commit fe1df6dafa
9 changed files with 348 additions and 266 deletions

View File

@@ -5,26 +5,20 @@ import org.briarproject.bramble.api.contact.Contact;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.PendingContact;
import org.briarproject.bramble.api.contact.PendingContactState;
import org.briarproject.bramble.api.contact.event.PendingContactStateChangedEvent;
import org.briarproject.bramble.api.crypto.KeyPair;
import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.NoSuchContactException;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.identity.Author;
import org.briarproject.bramble.api.identity.AuthorId;
import org.briarproject.bramble.api.identity.AuthorInfo;
import org.briarproject.bramble.api.identity.IdentityManager;
import org.briarproject.bramble.api.identity.LocalAuthor;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousFailedEvent;
import org.briarproject.bramble.api.transport.KeyManager;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.briarproject.bramble.test.DbExpectations;
import org.briarproject.bramble.test.PredicateMatcher;
import org.jmock.Expectations;
import org.junit.Before;
import org.junit.Test;
@@ -35,8 +29,6 @@ import java.util.Random;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.briarproject.bramble.api.contact.HandshakeLinkConstants.BASE32_LINK_BYTES;
import static org.briarproject.bramble.api.contact.PendingContactState.ADDING_CONTACT;
import static org.briarproject.bramble.api.contact.PendingContactState.FAILED;
import static org.briarproject.bramble.api.contact.PendingContactState.WAITING_FOR_CONNECTION;
import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_AUTHOR_NAME_LENGTH;
import static org.briarproject.bramble.api.identity.AuthorInfo.Status.OURSELVES;
@@ -65,7 +57,6 @@ public class ContactManagerImplTest extends BrambleMockTestCase {
context.mock(IdentityManager.class);
private final PendingContactFactory pendingContactFactory =
context.mock(PendingContactFactory.class);
private final EventBus eventBus = context.mock(EventBus.class);
private final Author remote = getAuthor();
private final LocalAuthor localAuthor = getLocalAuthor();
@@ -85,7 +76,7 @@ public class ContactManagerImplTest extends BrambleMockTestCase {
@Before
public void setUp() {
contactManager = new ContactManagerImpl(db, keyManager,
identityManager, pendingContactFactory, eventBus);
identityManager, pendingContactFactory);
}
@Test
@@ -328,143 +319,4 @@ public class ContactManagerImplTest extends BrambleMockTestCase {
assertEquals(WAITING_FOR_CONNECTION, pair.getSecond());
}
@Test
public void testPendingContactExpiresBeforeConnection() {
// The pending contact expires - the FAILED state is broadcast
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == FAILED)));
}});
contactManager.eventOccurred(new RendezvousFailedEvent(
pendingContact.getId()));
context.assertIsSatisfied();
// A rendezvous connection is opened - no state is broadcast
contactManager.eventOccurred(new RendezvousConnectionOpenedEvent(
pendingContact.getId()));
context.assertIsSatisfied();
// The rendezvous connection fails - no state is broadcast
contactManager.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), false));
}
@Test
public void testPendingContactExpiresDuringFailedConnection() {
// A rendezvous connection is opened - the ADDING_CONTACT state is
// broadcast
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == ADDING_CONTACT)));
}});
contactManager.eventOccurred(new RendezvousConnectionOpenedEvent(
pendingContact.getId()));
context.assertIsSatisfied();
// The pending contact expires - the FAILED state is broadcast
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == FAILED)));
}});
contactManager.eventOccurred(new RendezvousFailedEvent(
pendingContact.getId()));
context.assertIsSatisfied();
// The rendezvous connection fails - no state is broadcast
contactManager.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), false));
}
@Test
public void testPendingContactExpiresDuringSuccessfulConnection()
throws Exception {
Transaction txn = new Transaction(null, false);
// A rendezvous connection is opened - the ADDING_CONTACT state is
// broadcast
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == ADDING_CONTACT)));
}});
contactManager.eventOccurred(new RendezvousConnectionOpenedEvent(
pendingContact.getId()));
context.assertIsSatisfied();
// The pending contact expires - the FAILED state is broadcast
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == FAILED)));
}});
contactManager.eventOccurred(new RendezvousFailedEvent(
pendingContact.getId()));
context.assertIsSatisfied();
// The pending contact is converted to a contact - no state is broadcast
context.checking(new DbExpectations() {{
oneOf(db).getPendingContact(txn, pendingContact.getId());
will(returnValue(pendingContact));
oneOf(db).removePendingContact(txn, pendingContact.getId());
oneOf(db).addContact(txn, remote, local,
pendingContact.getPublicKey(), verified);
will(returnValue(contactId));
oneOf(db).setContactAlias(txn, contactId,
pendingContact.getAlias());
oneOf(identityManager).getHandshakeKeys(txn);
will(returnValue(handshakeKeyPair));
oneOf(keyManager).addContact(txn, contactId,
pendingContact.getPublicKey(), handshakeKeyPair);
oneOf(keyManager).addRotationKeys(txn, contactId, rootKey,
timestamp, alice, active);
oneOf(db).getContact(txn, contactId);
will(returnValue(contact));
}});
contactManager.addContact(txn, pendingContact.getId(), remote,
local, rootKey, timestamp, alice, verified, active);
context.assertIsSatisfied();
// The rendezvous connection succeeds - no state is broadcast
contactManager.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), true));
}
@Test
public void testPendingContactRemovedDuringFailedConnection()
throws Exception {
Transaction txn = new Transaction(null, false);
// A rendezvous connection is opened - the ADDING_CONTACT state is
// broadcast
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == ADDING_CONTACT)));
}});
contactManager.eventOccurred(new RendezvousConnectionOpenedEvent(
pendingContact.getId()));
context.assertIsSatisfied();
// The pending contact is removed - no state is broadcast
context.checking(new DbExpectations() {{
oneOf(db).transaction(with(false), withDbRunnable(txn));
oneOf(db).removePendingContact(txn, pendingContact.getId());
}});
contactManager.removePendingContact(pendingContact.getId());
context.assertIsSatisfied();
// The rendezvous connection fails - no state is broadcast
contactManager.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), false));
}
}

View File

@@ -1,8 +1,10 @@
package org.briarproject.bramble.rendezvous;
import org.briarproject.bramble.api.contact.PendingContact;
import org.briarproject.bramble.api.contact.PendingContactState;
import org.briarproject.bramble.api.contact.event.PendingContactAddedEvent;
import org.briarproject.bramble.api.contact.event.PendingContactRemovedEvent;
import org.briarproject.bramble.api.contact.event.PendingContactStateChangedEvent;
import org.briarproject.bramble.api.crypto.KeyPair;
import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.crypto.TransportCrypto;
@@ -20,12 +22,14 @@ import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.rendezvous.KeyMaterialSource;
import org.briarproject.bramble.api.rendezvous.RendezvousEndpoint;
import org.briarproject.bramble.api.rendezvous.event.RendezvousFailedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.briarproject.bramble.test.CaptureArgumentAction;
import org.briarproject.bramble.test.DbExpectations;
import org.briarproject.bramble.test.ImmediateExecutor;
import org.briarproject.bramble.test.PredicateMatcher;
import org.jmock.Expectations;
import org.junit.Before;
import org.junit.Test;
@@ -38,6 +42,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.briarproject.bramble.api.contact.PendingContactState.ADDING_CONTACT;
import static org.briarproject.bramble.api.contact.PendingContactState.FAILED;
import static org.briarproject.bramble.api.contact.PendingContactState.WAITING_FOR_CONNECTION;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.POLLING_INTERVAL_MS;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.RENDEZVOUS_TIMEOUT_MS;
import static org.briarproject.bramble.test.CollectionMatcher.collectionOf;
@@ -110,7 +117,10 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
// The pending contact has not expired
oneOf(clock).currentTimeMillis();
will(returnValue(beforeExpiry));
// Capture the poll task, we'll run it later
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == WAITING_FOR_CONNECTION)));
// Capture the poll task
oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS));
@@ -124,11 +134,7 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
context.assertIsSatisfied();
// Run the poll task - pending contact expires
context.checking(new Expectations() {{
oneOf(clock).currentTimeMillis();
will(returnValue(afterExpiry));
oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class)));
}});
expectPendingContactExpires(afterExpiry);
capturePollTask.get().run();
}
@@ -147,7 +153,9 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
// The pending contact has already expired
oneOf(clock).currentTimeMillis();
will(returnValue(atExpiry));
oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class)));
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == FAILED)));
// Schedule the poll task
oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
@@ -175,11 +183,7 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
context.assertIsSatisfied();
// Add the pending contact - endpoint should be created and polled
context.checking(new Expectations() {{
oneOf(clock).currentTimeMillis();
will(returnValue(beforeExpiry));
}});
expectAddUnexpiredPendingContact(beforeExpiry);
expectDeriveRendezvousKey();
expectCreateEndpoint();
@@ -230,11 +234,7 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
context.assertIsSatisfied();
// Add the pending contact - endpoint should be created and polled
context.checking(new Expectations() {{
oneOf(clock).currentTimeMillis();
will(returnValue(beforeExpiry));
}});
expectAddUnexpiredPendingContact(beforeExpiry);
expectDeriveRendezvousKey();
expectCreateEndpoint();
@@ -252,11 +252,10 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
context.assertIsSatisfied();
// Run the poll task - pending contact expires, endpoint is closed
expectPendingContactExpires(afterExpiry);
context.checking(new Expectations() {{
oneOf(clock).currentTimeMillis();
will(returnValue(afterExpiry));
oneOf(rendezvousEndpoint).close();
oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class)));
}});
capturePollTask.get().run();
@@ -283,11 +282,7 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
context.assertIsSatisfied();
// Add the pending contact - no endpoints should be created yet
context.checking(new DbExpectations() {{
oneOf(clock).currentTimeMillis();
will(returnValue(beforeExpiry));
}});
expectAddUnexpiredPendingContact(beforeExpiry);
expectDeriveRendezvousKey();
rendezvousPoller.eventOccurred(
@@ -314,15 +309,162 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
new PendingContactRemovedEvent(pendingContact.getId()));
}
@Test
public void testRendezvousConnectionEvents() throws Exception {
long beforeExpiry = pendingContact.getTimestamp();
// Start the service
expectStartupWithPendingContact(beforeExpiry);
rendezvousPoller.startService();
context.assertIsSatisfied();
// Connection is opened - event should be broadcast
expectStateChangedEvent(ADDING_CONTACT);
rendezvousPoller.eventOccurred(
new RendezvousConnectionOpenedEvent(pendingContact.getId()));
context.assertIsSatisfied();
// Connection fails - event should be broadcast
expectStateChangedEvent(WAITING_FOR_CONNECTION);
rendezvousPoller.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), false));
}
@Test
public void testPendingContactExpiresBeforeConnection() throws Exception {
long beforeExpiry = pendingContact.getTimestamp()
+ RENDEZVOUS_TIMEOUT_MS - 1000;
long afterExpiry = beforeExpiry + POLLING_INTERVAL_MS;
// Start the service, capturing the poll task
AtomicReference<Runnable> capturePollTask =
expectStartupWithPendingContact(beforeExpiry);
rendezvousPoller.startService();
context.assertIsSatisfied();
// Run the poll task - pending contact expires
expectPendingContactExpires(afterExpiry);
capturePollTask.get().run();
context.assertIsSatisfied();
// Connection is opened - no event should be broadcast
rendezvousPoller.eventOccurred(
new RendezvousConnectionOpenedEvent(pendingContact.getId()));
context.assertIsSatisfied();
// Connection fails - no event should be broadcast
rendezvousPoller.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), false));
}
@Test
public void testPendingContactExpiresDuringFailedConnection()
throws Exception {
long beforeExpiry = pendingContact.getTimestamp()
+ RENDEZVOUS_TIMEOUT_MS - 1000;
long afterExpiry = beforeExpiry + POLLING_INTERVAL_MS;
// Start the service, capturing the poll task
AtomicReference<Runnable> capturePollTask =
expectStartupWithPendingContact(beforeExpiry);
rendezvousPoller.startService();
context.assertIsSatisfied();
// Connection is opened - event should be broadcast
expectStateChangedEvent(ADDING_CONTACT);
rendezvousPoller.eventOccurred(
new RendezvousConnectionOpenedEvent(pendingContact.getId()));
context.assertIsSatisfied();
// Run the poll task - pending contact expires
expectPendingContactExpires(afterExpiry);
capturePollTask.get().run();
context.assertIsSatisfied();
// Connection fails - no event should be broadcast
rendezvousPoller.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), false));
}
@Test
public void testPendingContactExpiresDuringSuccessfulConnection()
throws Exception {
long beforeExpiry = pendingContact.getTimestamp()
+ RENDEZVOUS_TIMEOUT_MS - 1000;
long afterExpiry = beforeExpiry + POLLING_INTERVAL_MS;
// Start the service, capturing the poll task
AtomicReference<Runnable> capturePollTask =
expectStartupWithPendingContact(beforeExpiry);
rendezvousPoller.startService();
context.assertIsSatisfied();
// Connection is opened - event should be broadcast
expectStateChangedEvent(ADDING_CONTACT);
rendezvousPoller.eventOccurred(
new RendezvousConnectionOpenedEvent(pendingContact.getId()));
context.assertIsSatisfied();
// Run the poll task - pending contact expires
expectPendingContactExpires(afterExpiry);
capturePollTask.get().run();
context.assertIsSatisfied();
// Pending contact is removed - no event should be broadcast
rendezvousPoller.eventOccurred(
new PendingContactRemovedEvent(pendingContact.getId()));
}
@Test
public void testPendingContactRemovedDuringFailedConnection()
throws Exception {
long beforeExpiry = pendingContact.getTimestamp();
// Start the service
expectStartupWithPendingContact(beforeExpiry);
rendezvousPoller.startService();
context.assertIsSatisfied();
// Connection is opened - event should be broadcast
expectStateChangedEvent(ADDING_CONTACT);
rendezvousPoller.eventOccurred(
new RendezvousConnectionOpenedEvent(pendingContact.getId()));
context.assertIsSatisfied();
// Pending contact is removed - no event should be broadcast
rendezvousPoller.eventOccurred(
new PendingContactRemovedEvent(pendingContact.getId()));
context.assertIsSatisfied();
// Connection fails - no event should be broadcast
rendezvousPoller.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), false));
}
private AtomicReference<Runnable> expectStartupWithNoPendingContacts()
throws Exception {
Transaction txn = new Transaction(null, true);
AtomicReference<Runnable> capturePollTask = new AtomicReference<>();
context.checking(new DbExpectations() {{
// Load the pending contacts
oneOf(db).transaction(with(true), withDbRunnable(txn));
oneOf(db).getPendingContacts(txn);
will(returnValue(emptyList()));
// Capture the poll task
oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS));
@@ -333,6 +475,16 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
return capturePollTask;
}
private void expectAddUnexpiredPendingContact(long now) {
context.checking(new Expectations() {{
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == WAITING_FOR_CONNECTION)));
}});
}
private void expectDeriveRendezvousKey() throws Exception {
Transaction txn = new Transaction(null, true);
@@ -371,4 +523,50 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
will(returnValue(transportId));
}});
}
private AtomicReference<Runnable> expectStartupWithPendingContact(long now)
throws Exception {
Transaction txn = new Transaction(null, true);
AtomicReference<Runnable> capturePollTask = new AtomicReference<>();
context.checking(new DbExpectations() {{
// Load the pending contacts
oneOf(db).transaction(with(true), withDbRunnable(txn));
oneOf(db).getPendingContacts(txn);
will(returnValue(singletonList(pendingContact)));
// The pending contact has not expired
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == WAITING_FOR_CONNECTION)));
// Capture the poll task
oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS));
will(new CaptureArgumentAction<>(capturePollTask, Runnable.class,
0));
}});
expectDeriveRendezvousKey();
return capturePollTask;
}
private void expectPendingContactExpires(long now) {
context.checking(new Expectations() {{
oneOf(clock).currentTimeMillis();
will(returnValue(now));
}});
expectStateChangedEvent(FAILED);
}
private void expectStateChangedEvent(PendingContactState state) {
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == state)));
}});
}
}