Only check timeouts when we have some streams to monitor.

This commit is contained in:
akwizgran
2020-05-08 14:35:50 +01:00
parent f2f278c393
commit d3fd309609

View File

@@ -7,12 +7,14 @@ import org.briarproject.bramble.api.system.Scheduler;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject; import javax.inject.Inject;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -28,35 +30,58 @@ class TimeoutMonitorImpl implements TimeoutMonitor {
private static final long CHECK_INTERVAL_MS = SECONDS.toMillis(10); private static final long CHECK_INTERVAL_MS = SECONDS.toMillis(10);
private final ScheduledExecutorService scheduler;
private final Executor ioExecutor; private final Executor ioExecutor;
private final Clock clock; private final Clock clock;
private final Object lock = new Object();
@GuardedBy("lock")
private final List<TimeoutInputStream> streams = new ArrayList<>();
private final List<TimeoutInputStream> streams = @GuardedBy("lock")
new CopyOnWriteArrayList<>(); private Future<?> task = null;
@Inject @Inject
TimeoutMonitorImpl(@IoExecutor Executor ioExecutor, Clock clock, TimeoutMonitorImpl(@Scheduler ScheduledExecutorService scheduler,
@Scheduler ScheduledExecutorService scheduler) { @IoExecutor Executor ioExecutor, Clock clock) {
this.scheduler = scheduler;
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.clock = clock; this.clock = clock;
scheduler.scheduleWithFixedDelay(this::checkTimeouts,
CHECK_INTERVAL_MS, CHECK_INTERVAL_MS, MILLISECONDS);
} }
@Override @Override
public InputStream createTimeoutInputStream(InputStream in, public InputStream createTimeoutInputStream(InputStream in,
long timeoutMs) { long timeoutMs) {
TimeoutInputStream stream = new TimeoutInputStream(clock, in, TimeoutInputStream stream = new TimeoutInputStream(clock, in,
timeoutMs * 1_000_000, streams::remove); timeoutMs * 1_000_000, this::removeStream);
streams.add(stream); synchronized (lock) {
if (streams.isEmpty()) {
task = scheduler.scheduleWithFixedDelay(this::checkTimeouts,
CHECK_INTERVAL_MS, CHECK_INTERVAL_MS, MILLISECONDS);
}
streams.add(stream);
}
return stream; return stream;
} }
private void removeStream(TimeoutInputStream stream) {
Future<?> toCancel = null;
synchronized (lock) {
if (streams.remove(stream) && streams.isEmpty()) {
toCancel = task;
task = null;
}
}
if (toCancel != null) toCancel.cancel(false);
}
@Scheduler @Scheduler
private void checkTimeouts() { private void checkTimeouts() {
ioExecutor.execute(() -> { ioExecutor.execute(() -> {
LOG.info("Checking input stream timeouts"); List<TimeoutInputStream> snapshot;
for (TimeoutInputStream stream : streams) { synchronized (lock) {
snapshot = new ArrayList<>(streams);
}
for (TimeoutInputStream stream : snapshot) {
if (stream.hasTimedOut()) { if (stream.hasTimedOut()) {
LOG.info("Input stream has timed out"); LOG.info("Input stream has timed out");
try { try {