Add support for pending contacts to connection registry.

This commit is contained in:
akwizgran
2019-05-28 13:49:49 +01:00
parent 015f5005d0
commit 8bd4278ae5
5 changed files with 233 additions and 78 deletions

View File

@@ -2,6 +2,7 @@ package org.briarproject.bramble.plugin;
import org.briarproject.bramble.api.Multiset;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.PendingContactId;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.ConnectionRegistry;
@@ -10,41 +11,49 @@ import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent;
import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent;
import org.briarproject.bramble.api.plugin.event.ContactConnectedEvent;
import org.briarproject.bramble.api.plugin.event.ContactDisconnectedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionClosedEvent;
import org.briarproject.bramble.api.rendezvous.event.RendezvousConnectionOpenedEvent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Set;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import static java.util.logging.Level.INFO;
import static java.util.logging.Logger.getLogger;
@ThreadSafe
@NotNullByDefault
class ConnectionRegistryImpl implements ConnectionRegistry {
private static final Logger LOG =
Logger.getLogger(ConnectionRegistryImpl.class.getName());
getLogger(ConnectionRegistryImpl.class.getName());
private final EventBus eventBus;
private final Lock lock = new ReentrantLock();
// The following are locking: lock
private final Map<TransportId, Multiset<ContactId>> connections;
private final Object lock = new Object();
@GuardedBy("lock")
private final Map<TransportId, Multiset<ContactId>> contactConnections;
@GuardedBy("lock")
private final Multiset<ContactId> contactCounts;
@GuardedBy("lock")
private final Set<PendingContactId> connectedPendingContacts;
@Inject
ConnectionRegistryImpl(EventBus eventBus) {
this.eventBus = eventBus;
connections = new HashMap<>();
contactConnections = new HashMap<>();
contactCounts = new Multiset<>();
connectedPendingContacts = new HashSet<>();
}
@Override
@@ -55,17 +64,14 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
else LOG.info("Outgoing connection registered: " + t);
}
boolean firstConnection = false;
lock.lock();
try {
Multiset<ContactId> m = connections.get(t);
synchronized (lock) {
Multiset<ContactId> m = contactConnections.get(t);
if (m == null) {
m = new Multiset<>();
connections.put(t, m);
contactConnections.put(t, m);
}
m.add(c);
if (contactCounts.add(c) == 1) firstConnection = true;
} finally {
lock.unlock();
}
eventBus.broadcast(new ConnectionOpenedEvent(c, t, incoming));
if (firstConnection) {
@@ -82,14 +88,12 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
else LOG.info("Outgoing connection unregistered: " + t);
}
boolean lastConnection = false;
lock.lock();
try {
Multiset<ContactId> m = connections.get(t);
if (m == null) throw new IllegalArgumentException();
synchronized (lock) {
Multiset<ContactId> m = contactConnections.get(t);
if (m == null || !m.contains(c))
throw new IllegalArgumentException();
m.remove(c);
if (contactCounts.remove(c) == 0) lastConnection = true;
} finally {
lock.unlock();
}
eventBus.broadcast(new ConnectionClosedEvent(c, t, incoming));
if (lastConnection) {
@@ -100,37 +104,47 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
@Override
public Collection<ContactId> getConnectedContacts(TransportId t) {
lock.lock();
try {
Multiset<ContactId> m = connections.get(t);
synchronized (lock) {
Multiset<ContactId> m = contactConnections.get(t);
if (m == null) return Collections.emptyList();
List<ContactId> ids = new ArrayList<>(m.keySet());
if (LOG.isLoggable(INFO))
LOG.info(ids.size() + " contacts connected: " + t);
return ids;
} finally {
lock.unlock();
}
}
@Override
public boolean isConnected(ContactId c, TransportId t) {
lock.lock();
try {
Multiset<ContactId> m = connections.get(t);
synchronized (lock) {
Multiset<ContactId> m = contactConnections.get(t);
return m != null && m.contains(c);
} finally {
lock.unlock();
}
}
@Override
public boolean isConnected(ContactId c) {
lock.lock();
try {
synchronized (lock) {
return contactCounts.contains(c);
} finally {
lock.unlock();
}
}
@Override
public boolean registerConnection(PendingContactId p) {
boolean added;
synchronized (lock) {
added = connectedPendingContacts.add(p);
}
if (added) eventBus.broadcast(new RendezvousConnectionOpenedEvent(p));
return added;
}
@Override
public void unregisterConnection(PendingContactId p, boolean success) {
synchronized (lock) {
if (!connectedPendingContacts.remove(p))
throw new IllegalArgumentException();
}
eventBus.broadcast(new RendezvousConnectionClosedEvent(p, success));
}
}