diff --git a/bramble-core/src/main/java/org/briarproject/bramble/PriorityExecutor.java b/bramble-core/src/main/java/org/briarproject/bramble/PriorityExecutor.java new file mode 100644 index 000000000..153beb136 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/PriorityExecutor.java @@ -0,0 +1,67 @@ +package org.briarproject.bramble; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; + +import javax.annotation.concurrent.GuardedBy; + +@NotNullByDefault +public class PriorityExecutor { + + private final Object lock = new Object(); + private final Executor delegate, high, low; + @GuardedBy("lock") + private final Queue highQueue = new LinkedList<>(); + @GuardedBy("lock") + private final Queue lowQueue = new LinkedList<>(); + + @GuardedBy("lock") + private boolean isTaskRunning = false; + + public PriorityExecutor(Executor delegate) { + this.delegate = delegate; + high = r -> submit(r, true); + low = r -> submit(r, false); + } + + public Executor getHighPriorityExecutor() { + return high; + } + + public Executor getLowPriorityExecutor() { + return low; + } + + private void submit(Runnable r, boolean isHighPriority) { + Runnable wrapped = () -> { + try { + r.run(); + } finally { + scheduleNext(); + } + }; + synchronized (lock) { + if (!isTaskRunning && highQueue.isEmpty() && + (isHighPriority || lowQueue.isEmpty())) { + isTaskRunning = true; + delegate.execute(wrapped); + } else if (isHighPriority) { + highQueue.add(wrapped); + } else { + lowQueue.add(wrapped); + } + } + } + + private void scheduleNext() { + synchronized (lock) { + Runnable next = highQueue.poll(); + if (next == null) next = lowQueue.poll(); + if (next == null) isTaskRunning = false; + else delegate.execute(next); + } + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/PriorityExecutorTest.java b/bramble-core/src/test/java/org/briarproject/bramble/PriorityExecutorTest.java new file mode 100644 index 000000000..055848546 --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/PriorityExecutorTest.java @@ -0,0 +1,86 @@ +package org.briarproject.bramble; + +import org.briarproject.bramble.test.BrambleTestCase; +import org.junit.Test; + +import java.util.List; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; + +import static java.util.Arrays.asList; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class PriorityExecutorTest extends BrambleTestCase { + + @Test + public void testHighPriorityTasksAreDelegatedInOrderOfSubmission() + throws Exception { + Executor delegate = newSingleThreadExecutor(); + PriorityExecutor priority = new PriorityExecutor(delegate); + Executor high = priority.getHighPriorityExecutor(); + testTasksAreDelegatedInOrderOfSubmission(high); + } + + @Test + public void testLowPriorityTasksAreDelegatedInOrderOfSubmission() + throws Exception { + Executor delegate = newSingleThreadExecutor(); + PriorityExecutor priority = new PriorityExecutor(delegate); + Executor low = priority.getLowPriorityExecutor(); + testTasksAreDelegatedInOrderOfSubmission(low); + } + + @Test + public void testHighPriorityTasksAreRunFirst() throws Exception { + Executor delegate = newSingleThreadExecutor(); + PriorityExecutor priority = new PriorityExecutor(delegate); + Executor high = priority.getHighPriorityExecutor(); + Executor low = priority.getLowPriorityExecutor(); + // Submit a task that will block, causing other tasks to be queued + CountDownLatch cork = new CountDownLatch(1); + low.execute(() -> { + try { + cork.await(); + } catch (InterruptedException e) { + fail(); + } + }); + // Submit alternating tasks to the high and low priority executors + List results = new Vector<>(); + CountDownLatch tasksFinished = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + int result = i; + Runnable task = () -> { + results.add(result); + tasksFinished.countDown(); + }; + if (i % 2 == 0) high.execute(task); + else low.execute(task); + } + // Release the cork and wait for all tasks to finish + cork.countDown(); + tasksFinished.await(); + // The high-priority tasks should have run before the low-priority tasks + assertEquals(asList(0, 2, 4, 6, 8, 1, 3, 5, 7, 9), results); + } + + private void testTasksAreDelegatedInOrderOfSubmission(Executor e) + throws Exception { + List results = new Vector<>(); + CountDownLatch tasksFinished = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + int result = i; + e.execute(() -> { + results.add(result); + tasksFinished.countDown(); + }); + } + // Wait for all the tasks to finish + tasksFinished.await(); + // The tasks should have run in the order they were submitted + assertEquals(asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), results); + } +}