Only run the rendezvous polling task when we have pending contacts.

This commit is contained in:
akwizgran
2020-08-14 14:47:37 +01:00
parent c75c8da4b9
commit a8fe0a01ac
2 changed files with 72 additions and 44 deletions

View File

@@ -42,6 +42,7 @@ import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedE
import org.briarproject.bramble.api.rendezvous.event.RendezvousPollEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.TaskScheduler.Cancellable;
import org.briarproject.bramble.api.system.Wakeful;
import java.security.GeneralSecurityException;
@@ -68,6 +69,7 @@ import static org.briarproject.bramble.api.contact.PendingContactState.ADDING_CO
import static org.briarproject.bramble.api.contact.PendingContactState.FAILED;
import static org.briarproject.bramble.api.contact.PendingContactState.OFFLINE;
import static org.briarproject.bramble.api.contact.PendingContactState.WAITING_FOR_CONNECTION;
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNull;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.POLLING_INTERVAL_MS;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.RENDEZVOUS_TIMEOUT_MS;
@@ -102,6 +104,8 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
new HashMap<>();
@Nullable
private KeyPair handshakeKeyPair = null;
@Nullable
private Cancellable pollTask = null;
@Inject
RendezvousPollerImpl(@IoExecutor Executor ioExecutor,
@@ -144,8 +148,6 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
} catch (DbException e) {
throw new ServiceException(e);
}
scheduler.scheduleWithFixedDelay(this::poll, worker,
POLLING_INTERVAL_MS, POLLING_INTERVAL_MS, MILLISECONDS);
}
@EventExecutor
@@ -186,6 +188,12 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
}
if (cs.numEndpoints == 0) broadcastState(p.getId(), OFFLINE);
else broadcastState(p.getId(), WAITING_FOR_CONNECTION);
if (cryptoStates.size() == 1) {
LOG.info("Starting poller");
requireNull(pollTask);
pollTask = scheduler.scheduleWithFixedDelay(this::poll, worker,
POLLING_INTERVAL_MS, POLLING_INTERVAL_MS, MILLISECONDS);
}
} catch (DbException | GeneralSecurityException e) {
logException(LOG, WARNING, e);
}
@@ -208,6 +216,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
// Worker
@Wakeful
private void poll() {
LOG.info("Polling");
removeExpiredPendingContacts();
for (PluginState ps : pluginStates.values()) poll(ps);
}
@@ -234,6 +243,11 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
RendezvousEndpoint endpoint = ps.endpoints.remove(p);
if (endpoint != null) tryToClose(endpoint, LOG, INFO);
}
if (cryptoStates.isEmpty()) {
LOG.info("Stopping poller");
requireNonNull(pollTask).cancel();
pollTask = null;
}
}
// Worker

View File

@@ -27,6 +27,7 @@ import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedE
import org.briarproject.bramble.api.rendezvous.event.RendezvousPollEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.TaskScheduler.Cancellable;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.briarproject.bramble.test.CaptureArgumentAction;
import org.briarproject.bramble.test.DbExpectations;
@@ -79,6 +80,7 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
context.mock(KeyMaterialSource.class);
private final RendezvousEndpoint rendezvousEndpoint =
context.mock(RendezvousEndpoint.class);
private final Cancellable cancellable = context.mock(Cancellable.class);
private final Executor ioExecutor = new ImmediateExecutor();
private final PendingContact pendingContact = getPendingContact();
@@ -107,7 +109,7 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
long beforeExpiry = pendingContact.getTimestamp()
+ RENDEZVOUS_TIMEOUT_MS - 1000;
long afterExpiry = beforeExpiry + POLLING_INTERVAL_MS;
AtomicReference<Runnable> capturePollTask = new AtomicReference<>();
AtomicReference<Runnable> capturePollTask;
// Start the service
context.checking(new DbExpectations() {{
@@ -121,21 +123,17 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == OFFLINE)));
// Capture the poll task
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(any(Executor.class)), with(POLLING_INTERVAL_MS),
with(POLLING_INTERVAL_MS), with(MILLISECONDS));
will(new CaptureArgumentAction<>(capturePollTask, Runnable.class,
0));
}});
expectDeriveRendezvousKey();
capturePollTask = expectSchedulePolling();
rendezvousPoller.startService();
context.assertIsSatisfied();
// Run the poll task - pending contact expires
// Run the poll task - pending contact expires, polling is cancelled
expectPendingContactExpires(afterExpiry);
expectCancelPolling();
capturePollTask.get().run();
}
@@ -157,10 +155,6 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == FAILED)));
// Schedule the poll task
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(any(Executor.class)), with(POLLING_INTERVAL_MS),
with(POLLING_INTERVAL_MS), with(MILLISECONDS));
}});
rendezvousPoller.startService();
@@ -183,7 +177,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
rendezvousPoller.eventOccurred(new TransportActiveEvent(transportId));
context.assertIsSatisfied();
// Add the pending contact - endpoint should be created and polled
// Add the pending contact - endpoint should be created and polled,
// polling should be scheduled
expectAddPendingContact(beforeExpiry, WAITING_FOR_CONNECTION);
expectDeriveRendezvousKey();
expectCreateEndpoint();
@@ -200,12 +195,16 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
any(ConnectionHandler.class)))));
}});
expectSchedulePolling();
rendezvousPoller.eventOccurred(
new PendingContactAddedEvent(pendingContact));
context.assertIsSatisfied();
// Remove the pending contact - endpoint should be closed
// Remove the pending contact - endpoint should be closed,
// polling should be cancelled
expectCloseEndpoint();
expectCancelPolling();
rendezvousPoller.eventOccurred(
new PendingContactRemovedEvent(pendingContact.getId()));
@@ -221,10 +220,10 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
long beforeExpiry = pendingContact.getTimestamp()
+ RENDEZVOUS_TIMEOUT_MS - 1000;
long afterExpiry = beforeExpiry + POLLING_INTERVAL_MS;
AtomicReference<Runnable> capturePollTask;
// Start the service, capturing the poll task
AtomicReference<Runnable> capturePollTask =
expectStartupWithNoPendingContacts();
// Start the service
expectStartupWithNoPendingContacts();
rendezvousPoller.startService();
context.assertIsSatisfied();
@@ -235,7 +234,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
rendezvousPoller.eventOccurred(new TransportActiveEvent(transportId));
context.assertIsSatisfied();
// Add the pending contact - endpoint should be created and polled
// Add the pending contact - endpoint should be created and polled,
// polling should be scheduled
expectAddPendingContact(beforeExpiry, WAITING_FOR_CONNECTION);
expectDeriveRendezvousKey();
expectCreateEndpoint();
@@ -252,13 +252,17 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
any(ConnectionHandler.class)))));
}});
capturePollTask = expectSchedulePolling();
rendezvousPoller.eventOccurred(
new PendingContactAddedEvent(pendingContact));
context.assertIsSatisfied();
// Run the poll task - pending contact expires, endpoint is closed
// Run the poll task - pending contact expires, endpoint is closed,
// polling is cancelled
expectPendingContactExpires(afterExpiry);
expectCloseEndpoint();
expectCancelPolling();
capturePollTask.get().run();
context.assertIsSatisfied();
@@ -286,6 +290,7 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
// Add the pending contact - no endpoints should be created yet
expectAddPendingContact(beforeExpiry, OFFLINE);
expectDeriveRendezvousKey();
expectSchedulePolling();
rendezvousPoller.eventOccurred(
new PendingContactAddedEvent(pendingContact));
@@ -307,6 +312,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
context.assertIsSatisfied();
// Remove the pending contact - endpoint is already closed
expectCancelPolling();
rendezvousPoller.eventOccurred(
new PendingContactRemovedEvent(pendingContact.getId()));
}
@@ -348,8 +355,9 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
rendezvousPoller.startService();
context.assertIsSatisfied();
// Run the poll task - pending contact expires
// Run the poll task - pending contact expires, polling is cancelled
expectPendingContactExpires(afterExpiry);
expectCancelPolling();
capturePollTask.get().run();
context.assertIsSatisfied();
@@ -385,8 +393,9 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
new RendezvousConnectionOpenedEvent(pendingContact.getId()));
context.assertIsSatisfied();
// Run the poll task - pending contact expires
// Run the poll task - pending contact expires, polling is cancelled
expectPendingContactExpires(afterExpiry);
expectCancelPolling();
capturePollTask.get().run();
context.assertIsSatisfied();
@@ -417,8 +426,9 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
new RendezvousConnectionOpenedEvent(pendingContact.getId()));
context.assertIsSatisfied();
// Run the poll task - pending contact expires
// Run the poll task - pending contact expires, polling is cancelled
expectPendingContactExpires(afterExpiry);
expectCancelPolling();
capturePollTask.get().run();
context.assertIsSatisfied();
@@ -447,6 +457,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
context.assertIsSatisfied();
// Pending contact is removed - no event should be broadcast
expectCancelPolling();
rendezvousPoller.eventOccurred(
new PendingContactRemovedEvent(pendingContact.getId()));
context.assertIsSatisfied();
@@ -456,25 +468,35 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
pendingContact.getId(), false));
}
private AtomicReference<Runnable> expectStartupWithNoPendingContacts()
throws Exception {
Transaction txn = new Transaction(null, true);
private AtomicReference<Runnable> expectSchedulePolling() {
AtomicReference<Runnable> capturePollTask = new AtomicReference<>();
context.checking(new Expectations() {{
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(any(Executor.class)), with(POLLING_INTERVAL_MS),
with(POLLING_INTERVAL_MS), with(MILLISECONDS));
will(doAll(new CaptureArgumentAction<>(capturePollTask,
Runnable.class, 0), returnValue(cancellable)));
}});
return capturePollTask;
}
private void expectCancelPolling() {
context.checking(new Expectations() {{
oneOf(cancellable).cancel();
}});
}
private void expectStartupWithNoPendingContacts() throws Exception {
Transaction txn = new Transaction(null, true);
context.checking(new DbExpectations() {{
// Load the pending contacts
oneOf(db).transaction(with(true), withDbRunnable(txn));
oneOf(db).getPendingContacts(txn);
will(returnValue(emptyList()));
// Capture the poll task
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(any(Executor.class)), with(POLLING_INTERVAL_MS),
with(POLLING_INTERVAL_MS), with(MILLISECONDS));
will(new CaptureArgumentAction<>(capturePollTask, Runnable.class,
0));
}});
return capturePollTask;
}
private void expectAddPendingContact(long now,
@@ -530,7 +552,6 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
private AtomicReference<Runnable> expectStartupWithPendingContact(long now)
throws Exception {
Transaction txn = new Transaction(null, true);
AtomicReference<Runnable> capturePollTask = new AtomicReference<>();
context.checking(new DbExpectations() {{
// Load the pending contacts
@@ -543,17 +564,10 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == OFFLINE)));
// Capture the poll task
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(any(Executor.class)), with(POLLING_INTERVAL_MS),
with(POLLING_INTERVAL_MS), with(MILLISECONDS));
will(new CaptureArgumentAction<>(capturePollTask, Runnable.class,
0));
}});
expectDeriveRendezvousKey();
return capturePollTask;
return expectSchedulePolling();
}
private void expectPendingContactExpires(long now) {