Use wakeful IO executor for polling, reconnection tasks.

This commit is contained in:
akwizgran
2020-08-10 16:49:46 +01:00
parent 086c10abc0
commit 434b8a37f3
2 changed files with 20 additions and 20 deletions

View File

@@ -27,6 +27,7 @@ import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.properties.TransportPropertyManager; import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.TaskScheduler; import org.briarproject.bramble.api.system.TaskScheduler;
import org.briarproject.bramble.api.system.WakefulIoExecutor;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
@@ -56,7 +57,7 @@ class PollerImpl implements Poller, EventListener {
private static final Logger LOG = getLogger(PollerImpl.class.getName()); private static final Logger LOG = getLogger(PollerImpl.class.getName());
private final Executor ioExecutor; private final Executor wakefulIoExecutor;
private final TaskScheduler scheduler; private final TaskScheduler scheduler;
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final ConnectionRegistry connectionRegistry; private final ConnectionRegistry connectionRegistry;
@@ -69,7 +70,7 @@ class PollerImpl implements Poller, EventListener {
private final Map<TransportId, ScheduledPollTask> tasks; private final Map<TransportId, ScheduledPollTask> tasks;
@Inject @Inject
PollerImpl(@IoExecutor Executor ioExecutor, PollerImpl(@WakefulIoExecutor Executor wakefulIoExecutor,
TaskScheduler scheduler, TaskScheduler scheduler,
ConnectionManager connectionManager, ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry, ConnectionRegistry connectionRegistry,
@@ -77,7 +78,7 @@ class PollerImpl implements Poller, EventListener {
TransportPropertyManager transportPropertyManager, TransportPropertyManager transportPropertyManager,
SecureRandom random, SecureRandom random,
Clock clock) { Clock clock) {
this.ioExecutor = ioExecutor; this.wakefulIoExecutor = wakefulIoExecutor;
this.scheduler = scheduler; this.scheduler = scheduler;
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry; this.connectionRegistry = connectionRegistry;
@@ -118,7 +119,6 @@ class PollerImpl implements Poller, EventListener {
} }
} }
// TODO: Make this wakeful
private void connectToContact(ContactId c) { private void connectToContact(ContactId c) {
for (SimplexPlugin s : pluginManager.getSimplexPlugins()) for (SimplexPlugin s : pluginManager.getSimplexPlugins())
if (s.shouldPoll()) connectToContact(c, s); if (s.shouldPoll()) connectToContact(c, s);
@@ -135,7 +135,7 @@ class PollerImpl implements Poller, EventListener {
} }
private void connectToContact(ContactId c, SimplexPlugin p) { private void connectToContact(ContactId c, SimplexPlugin p) {
ioExecutor.execute(() -> { wakefulIoExecutor.execute(() -> {
TransportId t = p.getId(); TransportId t = p.getId();
if (connectionRegistry.isConnected(c, t)) return; if (connectionRegistry.isConnected(c, t)) return;
try { try {
@@ -151,7 +151,7 @@ class PollerImpl implements Poller, EventListener {
} }
private void connectToContact(ContactId c, DuplexPlugin p) { private void connectToContact(ContactId c, DuplexPlugin p) {
ioExecutor.execute(() -> { wakefulIoExecutor.execute(() -> {
TransportId t = p.getId(); TransportId t = p.getId();
if (connectionRegistry.isConnected(c, t)) return; if (connectionRegistry.isConnected(c, t)) return;
try { try {
@@ -190,8 +190,8 @@ class PollerImpl implements Poller, EventListener {
// it will abort safely when it finds it's been replaced // it will abort safely when it finds it's been replaced
if (scheduled != null) scheduled.future.cancel(false); if (scheduled != null) scheduled.future.cancel(false);
PollTask task = new PollTask(p, due, randomiseNext); PollTask task = new PollTask(p, due, randomiseNext);
Future<?> future = scheduler.schedule(task, ioExecutor, delay, Future<?> future = scheduler.schedule(task, wakefulIoExecutor,
MILLISECONDS); delay, MILLISECONDS);
tasks.put(t, new ScheduledPollTask(task, future)); tasks.put(t, new ScheduledPollTask(task, future));
} }
} finally { } finally {

View File

@@ -58,7 +58,7 @@ public class PollerImplTest extends BrambleMockTestCase {
private final Future<?> future = context.mock(Future.class); private final Future<?> future = context.mock(Future.class);
private final SecureRandom random; private final SecureRandom random;
private final Executor ioExecutor = new ImmediateExecutor(); private final Executor wakefulIoExecutor = new ImmediateExecutor();
private final TransportId transportId = getTransportId(); private final TransportId transportId = getTransportId();
private final ContactId contactId = getContactId(); private final ContactId contactId = getContactId();
private final TransportProperties properties = new TransportProperties(); private final TransportProperties properties = new TransportProperties();
@@ -74,7 +74,7 @@ public class PollerImplTest extends BrambleMockTestCase {
@Before @Before
public void setUp() { public void setUp() {
poller = new PollerImpl(ioExecutor, scheduler, connectionManager, poller = new PollerImpl(wakefulIoExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, transportPropertyManager, connectionRegistry, pluginManager, transportPropertyManager,
random, clock); random, clock);
} }
@@ -234,7 +234,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval), with(wakefulIoExecutor), with((long) pollingInterval),
with(MILLISECONDS)); with(MILLISECONDS));
will(returnValue(future)); will(returnValue(future));
}}); }});
@@ -263,7 +263,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval), with(wakefulIoExecutor), with((long) pollingInterval),
with(MILLISECONDS)); with(MILLISECONDS));
will(returnValue(future)); will(returnValue(future));
// Second event // Second event
@@ -306,7 +306,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval), with(wakefulIoExecutor), with((long) pollingInterval),
with(MILLISECONDS)); with(MILLISECONDS));
will(returnValue(future)); will(returnValue(future));
// Second event // Second event
@@ -323,7 +323,7 @@ public class PollerImplTest extends BrambleMockTestCase {
will(returnValue(now + 1)); will(returnValue(now + 1));
oneOf(future).cancel(false); oneOf(future).cancel(false);
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval - 2), with(wakefulIoExecutor), with((long) pollingInterval - 2),
with(MILLISECONDS)); with(MILLISECONDS));
}}); }});
@@ -350,7 +350,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS)); with(wakefulIoExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future)); will(returnValue(future));
will(new RunAction()); will(new RunAction());
// Running the polling task schedules the next polling task // Running the polling task schedules the next polling task
@@ -361,7 +361,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) (pollingInterval * 0.5)), with(wakefulIoExecutor), with((long) (pollingInterval * 0.5)),
with(MILLISECONDS)); with(MILLISECONDS));
will(returnValue(future)); will(returnValue(future));
// Get the transport properties and connected contacts // Get the transport properties and connected contacts
@@ -394,7 +394,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS)); with(wakefulIoExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future)); will(returnValue(future));
will(new RunAction()); will(new RunAction());
// Running the polling task schedules the next polling task // Running the polling task schedules the next polling task
@@ -405,7 +405,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) (pollingInterval * 0.5)), with(wakefulIoExecutor), with((long) (pollingInterval * 0.5)),
with(MILLISECONDS)); with(MILLISECONDS));
will(returnValue(future)); will(returnValue(future));
// Get the transport properties and connected contacts // Get the transport properties and connected contacts
@@ -436,7 +436,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with(0L), with(MILLISECONDS)); with(wakefulIoExecutor), with(0L), with(MILLISECONDS));
will(returnValue(future)); will(returnValue(future));
// The plugin is deactivated before the task runs - cancel the task // The plugin is deactivated before the task runs - cancel the task
oneOf(future).cancel(false); oneOf(future).cancel(false);
@@ -460,7 +460,7 @@ public class PollerImplTest extends BrambleMockTestCase {
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(ioExecutor), with((long) pollingInterval), with(wakefulIoExecutor), with((long) pollingInterval),
with(MILLISECONDS)); with(MILLISECONDS));
will(returnValue(future)); will(returnValue(future));
}}); }});