Add recently connected state to core and UI.

This commit is contained in:
akwizgran
2020-05-04 16:08:42 +01:00
parent 8fd9a40ffb
commit d1cda4fb1e
38 changed files with 391 additions and 275 deletions

View File

@@ -6,30 +6,41 @@ import org.briarproject.bramble.api.contact.PendingContactId;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.ConnectionRegistry;
import org.briarproject.bramble.api.plugin.ConnectionStatus;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent;
import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent;
import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent;
import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent;
import org.briarproject.bramble.api.plugin.event.ConnectionStatusChangedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.Scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.plugin.ConnectionStatus.CONNECTED;
import static org.briarproject.bramble.api.plugin.ConnectionStatus.DISCONNECTED;
import static org.briarproject.bramble.api.plugin.ConnectionStatus.RECENTLY_CONNECTED;
@ThreadSafe
@NotNullByDefault
@@ -38,22 +49,30 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
private static final Logger LOG =
getLogger(ConnectionRegistryImpl.class.getName());
private static final long RECENTLY_CONNECTED_MS = MINUTES.toMillis(1);
private static final long EXPIRY_INTERVAL_MS = SECONDS.toMillis(10);
private final EventBus eventBus;
private final Clock clock;
private final Object lock = new Object();
@GuardedBy("lock")
private final Map<TransportId, Multiset<ContactId>> contactConnections;
@GuardedBy("lock")
private final Multiset<ContactId> contactCounts;
private final Map<ContactId, Counter> contactCounts;
@GuardedBy("lock")
private final Set<PendingContactId> connectedPendingContacts;
@Inject
ConnectionRegistryImpl(EventBus eventBus) {
ConnectionRegistryImpl(EventBus eventBus, Clock clock,
@Scheduler ScheduledExecutorService scheduler) {
this.eventBus = eventBus;
this.clock = clock;
contactConnections = new HashMap<>();
contactCounts = new Multiset<>();
contactCounts = new HashMap<>();
connectedPendingContacts = new HashSet<>();
scheduler.scheduleWithFixedDelay(this::expireRecentConnections,
EXPIRY_INTERVAL_MS, EXPIRY_INTERVAL_MS, MILLISECONDS);
}
@Override
@@ -71,12 +90,22 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
contactConnections.put(t, m);
}
m.add(c);
if (contactCounts.add(c) == 1) firstConnection = true;
Counter counter = contactCounts.get(c);
if (counter == null) {
counter = new Counter();
contactCounts.put(c, counter);
}
if (counter.connections == 0) {
counter.disconnectedTime = 0;
firstConnection = true;
}
counter.connections++;
}
eventBus.broadcast(new ConnectionOpenedEvent(c, t, incoming));
if (firstConnection) {
LOG.info("Contact connected");
eventBus.broadcast(new ContactConnectedEvent(c));
eventBus.broadcast(new ConnectionStatusChangedEvent(c, CONNECTED));
}
}
@@ -93,12 +122,22 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
if (m == null || !m.contains(c))
throw new IllegalArgumentException();
m.remove(c);
if (contactCounts.remove(c) == 0) lastConnection = true;
Counter counter = contactCounts.get(c);
if (counter == null || counter.connections == 0) {
throw new IllegalArgumentException();
}
counter.connections--;
if (counter.connections == 0) {
counter.disconnectedTime = clock.currentTimeMillis();
lastConnection = true;
}
}
eventBus.broadcast(new ConnectionClosedEvent(c, t, incoming));
if (lastConnection) {
LOG.info("Contact disconnected");
eventBus.broadcast(new ContactDisconnectedEvent(c));
eventBus.broadcast(
new ConnectionStatusChangedEvent(c, RECENTLY_CONNECTED));
}
}
@@ -106,7 +145,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
public Collection<ContactId> getConnectedContacts(TransportId t) {
synchronized (lock) {
Multiset<ContactId> m = contactConnections.get(t);
if (m == null) return Collections.emptyList();
if (m == null) return emptyList();
List<ContactId> ids = new ArrayList<>(m.keySet());
if (LOG.isLoggable(INFO))
LOG.info(ids.size() + " contacts connected: " + t);
@@ -123,9 +162,11 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
@Override
public boolean isConnected(ContactId c) {
public ConnectionStatus getConnectionStatus(ContactId c) {
synchronized (lock) {
return contactCounts.contains(c);
Counter counter = contactCounts.get(c);
if (counter == null) return DISCONNECTED;
return counter.connections > 0 ? CONNECTED : RECENTLY_CONNECTED;
}
}
@@ -147,4 +188,36 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
eventBus.broadcast(new RendezvousConnectionClosedEvent(p, success));
}
@Scheduler
private void expireRecentConnections() {
long now = clock.currentTimeMillis();
List<ContactId> disconnected = new ArrayList<>();
synchronized (lock) {
Iterator<Entry<ContactId, Counter>> it =
contactCounts.entrySet().iterator();
while (it.hasNext()) {
Entry<ContactId, Counter> e = it.next();
if (e.getValue().isExpired(now)) {
disconnected.add(e.getKey());
it.remove();
}
}
}
for (ContactId c : disconnected) {
eventBus.broadcast(
new ConnectionStatusChangedEvent(c, DISCONNECTED));
}
}
private static class Counter {
private int connections = 0;
private long disconnectedTime = 0;
private boolean isExpired(long now) {
return connections == 0 &&
now - disconnectedTime > RECENTLY_CONNECTED_MS;
}
}
}

View File

@@ -7,18 +7,20 @@ import org.briarproject.bramble.api.plugin.ConnectionRegistry;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent;
import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent;
import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent;
import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent;
import org.briarproject.bramble.api.plugin.event.ConnectionStatusChangedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.jmock.Expectations;
import org.junit.Test;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.briarproject.bramble.test.TestUtils.getContactId;
import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.briarproject.bramble.test.TestUtils.getTransportId;
@@ -30,6 +32,9 @@ import static org.junit.Assert.fail;
public class ConnectionRegistryImplTest extends BrambleMockTestCase {
private final EventBus eventBus = context.mock(EventBus.class);
private final Clock clock = context.mock(Clock.class);
private final ScheduledExecutorService scheduler =
context.mock(ScheduledExecutorService.class);
private final ContactId contactId = getContactId();
private final ContactId contactId1 = getContactId();
@@ -40,17 +45,25 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
@Test
public void testRegisterAndUnregister() {
ConnectionRegistry c = new ConnectionRegistryImpl(eventBus);
context.checking(new Expectations() {{
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(10_000L), with(10_000L), with(MILLISECONDS));
}});
ConnectionRegistry c = new ConnectionRegistryImpl(eventBus, clock,
scheduler);
context.assertIsSatisfied();
// The registry should be empty
assertEquals(emptyList(), c.getConnectedContacts(transportId));
assertEquals(emptyList(), c.getConnectedContacts(transportId1));
// Check that a registered connection shows up - this should
// broadcast a ConnectionOpenedEvent and a ContactConnectedEvent
// broadcast a ConnectionOpenedEvent and a ConnectionStatusChangedEvent
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionOpenedEvent.class)));
oneOf(eventBus).broadcast(with(any(ContactConnectedEvent.class)));
oneOf(eventBus).broadcast(with(any(
ConnectionStatusChangedEvent.class)));
}});
c.registerConnection(contactId, transportId, true);
assertEquals(singletonList(contactId),
@@ -81,11 +94,13 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.assertIsSatisfied();
// Unregister the other connection - this should broadcast a
// ConnectionClosedEvent and a ContactDisconnectedEvent
// ConnectionClosedEvent and a ConnectionStatusChangedEvent
context.checking(new Expectations() {{
oneOf(clock).currentTimeMillis();
will(returnValue(System.currentTimeMillis()));
oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class)));
oneOf(eventBus).broadcast(with(any(
ContactDisconnectedEvent.class)));
ConnectionStatusChangedEvent.class)));
}});
c.unregisterConnection(contactId, transportId, true);
assertEquals(emptyList(), c.getConnectedContacts(transportId));
@@ -102,12 +117,12 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
// Register both contacts with one transport, one contact with both -
// this should broadcast three ConnectionOpenedEvents and two
// ContactConnectedEvents
// ConnectionStatusChangedEvents
context.checking(new Expectations() {{
exactly(3).of(eventBus).broadcast(with(any(
ConnectionOpenedEvent.class)));
exactly(2).of(eventBus).broadcast(with(any(
ContactConnectedEvent.class)));
ConnectionStatusChangedEvent.class)));
}});
c.registerConnection(contactId, transportId, true);
c.registerConnection(contactId1, transportId, true);
@@ -122,7 +137,14 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
@Test
public void testRegisterAndUnregisterPendingContacts() {
ConnectionRegistry c = new ConnectionRegistryImpl(eventBus);
context.checking(new Expectations() {{
oneOf(scheduler).scheduleWithFixedDelay(with(any(Runnable.class)),
with(10_000L), with(10_000L), with(MILLISECONDS));
}});
ConnectionRegistry c = new ConnectionRegistryImpl(eventBus, clock,
scheduler);
context.assertIsSatisfied();
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(