From a38113e8625ec6dc62c4038d42b1cc3ff961454d Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 23 May 2019 12:37:34 +0100 Subject: [PATCH] Add rendezvous poller. --- .../api/plugin/duplex/DuplexPlugin.java | 4 +- .../api/rendezvous/RendezvousConstants.java | 6 + .../event/RendezvousFailedEvent.java | 25 ++ .../bramble/BrambleCoreEagerSingletons.java | 4 + .../bramble/rendezvous/RendezvousModule.java | 19 + .../bramble/rendezvous/RendezvousPoller.java | 7 + .../rendezvous/RendezvousPollerImpl.java | 329 ++++++++++++++++++ 7 files changed, 393 insertions(+), 1 deletion(-) create mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/rendezvous/event/RendezvousFailedEvent.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPoller.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPollerImpl.java diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/DuplexPlugin.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/DuplexPlugin.java index 38ba819de..bc1973243 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/DuplexPlugin.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/DuplexPlugin.java @@ -52,8 +52,10 @@ public interface DuplexPlugin extends Plugin { /** * Creates and returns an endpoint that uses the given key material to * rendezvous with a pending contact, and the given connection handler to - * handle incoming connections. + * handle incoming connections. Returns null if an endpoint cannot be + * created. */ + @Nullable RendezvousEndpoint createRendezvousEndpoint(KeyMaterialSource k, ConnectionHandler incoming); } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/rendezvous/RendezvousConstants.java b/bramble-api/src/main/java/org/briarproject/bramble/api/rendezvous/RendezvousConstants.java index a27c15273..d114db90c 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/rendezvous/RendezvousConstants.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/rendezvous/RendezvousConstants.java @@ -1,6 +1,7 @@ package org.briarproject.bramble.api.rendezvous; import static java.util.concurrent.TimeUnit.DAYS; +import static java.util.concurrent.TimeUnit.MINUTES; public interface RendezvousConstants { @@ -14,6 +15,11 @@ public interface RendezvousConstants { */ long RENDEZVOUS_TIMEOUT_MS = DAYS.toMillis(2); + /** + * How often to try to rendezvous with pending contacts. + */ + long POLLING_INTERVAL_MS = MINUTES.toMillis(1); + /** * Label for deriving the rendezvous key from the handshake key pairs. */ diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/rendezvous/event/RendezvousFailedEvent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/rendezvous/event/RendezvousFailedEvent.java new file mode 100644 index 000000000..0530db1e2 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/rendezvous/event/RendezvousFailedEvent.java @@ -0,0 +1,25 @@ +package org.briarproject.bramble.api.rendezvous.event; + +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +import javax.annotation.concurrent.Immutable; + +/** + * An event that is broadcast when a rendezvous with a pending contact fails. + */ +@Immutable +@NotNullByDefault +public class RendezvousFailedEvent extends Event { + + private final PendingContactId pendingContactId; + + public RendezvousFailedEvent(PendingContactId pendingContactId) { + this.pendingContactId = pendingContactId; + } + + public PendingContactId getPendingContactId() { + return pendingContactId; + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreEagerSingletons.java b/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreEagerSingletons.java index 6ca1c4ec8..d0834d682 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreEagerSingletons.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreEagerSingletons.java @@ -7,6 +7,7 @@ import org.briarproject.bramble.identity.IdentityModule; import org.briarproject.bramble.lifecycle.LifecycleModule; import org.briarproject.bramble.plugin.PluginModule; import org.briarproject.bramble.properties.PropertiesModule; +import org.briarproject.bramble.rendezvous.RendezvousModule; import org.briarproject.bramble.sync.validation.ValidationModule; import org.briarproject.bramble.system.SystemModule; import org.briarproject.bramble.transport.TransportModule; @@ -28,6 +29,8 @@ public interface BrambleCoreEagerSingletons { void inject(PropertiesModule.EagerSingletons init); + void inject(RendezvousModule.EagerSingletons init); + void inject(SystemModule.EagerSingletons init); void inject(TransportModule.EagerSingletons init); @@ -42,6 +45,7 @@ public interface BrambleCoreEagerSingletons { inject(new DatabaseExecutorModule.EagerSingletons()); inject(new IdentityModule.EagerSingletons()); inject(new LifecycleModule.EagerSingletons()); + inject(new RendezvousModule.EagerSingletons()); inject(new PluginModule.EagerSingletons()); inject(new PropertiesModule.EagerSingletons()); inject(new SystemModule.EagerSingletons()); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousModule.java b/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousModule.java index f166303a5..f1693816d 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousModule.java @@ -1,16 +1,35 @@ package org.briarproject.bramble.rendezvous; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.rendezvous.RendezvousCrypto; +import javax.inject.Inject; +import javax.inject.Singleton; + import dagger.Module; import dagger.Provides; @Module public class RendezvousModule { + public static class EagerSingletons { + @Inject + RendezvousPoller rendezvousPoller; + } + @Provides RendezvousCrypto provideRendezvousCrypto( RendezvousCryptoImpl rendezvousCrypto) { return rendezvousCrypto; } + + @Provides + @Singleton + RendezvousPoller provideRendezvousPoller(LifecycleManager lifecycleManager, + EventBus eventBus, RendezvousPollerImpl rendezvousPoller) { + lifecycleManager.registerService(rendezvousPoller); + eventBus.addListener(rendezvousPoller); + return rendezvousPoller; + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPoller.java b/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPoller.java new file mode 100644 index 000000000..4ecfc32a6 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPoller.java @@ -0,0 +1,7 @@ +package org.briarproject.bramble.rendezvous; + +/** + * Empty interface for injecting the rendezvous poller. + */ +interface RendezvousPoller { +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPollerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPollerImpl.java new file mode 100644 index 000000000..53f5d5c17 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPollerImpl.java @@ -0,0 +1,329 @@ +package org.briarproject.bramble.rendezvous; + +import org.briarproject.bramble.PoliteExecutor; +import org.briarproject.bramble.api.Pair; +import org.briarproject.bramble.api.contact.PendingContact; +import org.briarproject.bramble.api.contact.PendingContactId; +import org.briarproject.bramble.api.contact.event.PendingContactAddedEvent; +import org.briarproject.bramble.api.contact.event.PendingContactRemovedEvent; +import org.briarproject.bramble.api.crypto.KeyPair; +import org.briarproject.bramble.api.crypto.SecretKey; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.event.EventExecutor; +import org.briarproject.bramble.api.event.EventListener; +import org.briarproject.bramble.api.identity.IdentityManager; +import org.briarproject.bramble.api.lifecycle.IoExecutor; +import org.briarproject.bramble.api.lifecycle.Service; +import org.briarproject.bramble.api.lifecycle.ServiceException; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.ConnectionHandler; +import org.briarproject.bramble.api.plugin.ConnectionManager; +import org.briarproject.bramble.api.plugin.Plugin; +import org.briarproject.bramble.api.plugin.PluginManager; +import org.briarproject.bramble.api.plugin.TransportConnectionReader; +import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin; +import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.plugin.event.TransportDisabledEvent; +import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent; +import org.briarproject.bramble.api.properties.TransportProperties; +import org.briarproject.bramble.api.rendezvous.KeyMaterialSource; +import org.briarproject.bramble.api.rendezvous.RendezvousCrypto; +import org.briarproject.bramble.api.rendezvous.RendezvousEndpoint; +import org.briarproject.bramble.api.rendezvous.event.RendezvousFailedEvent; +import org.briarproject.bramble.api.system.Clock; +import org.briarproject.bramble.api.system.Scheduler; + +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Logger; + +import javax.annotation.Nullable; +import javax.inject.Inject; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNull; +import static org.briarproject.bramble.api.rendezvous.RendezvousConstants.POLLING_INTERVAL_MS; +import static org.briarproject.bramble.api.rendezvous.RendezvousConstants.RENDEZVOUS_TIMEOUT_MS; +import static org.briarproject.bramble.util.IoUtils.tryToClose; +import static org.briarproject.bramble.util.LogUtils.logException; + +@NotNullByDefault +class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { + + private static final Logger LOG = + getLogger(RendezvousPollerImpl.class.getName()); + + private final ScheduledExecutorService scheduler; + private final DatabaseComponent db; + private final IdentityManager identityManager; + private final RendezvousCrypto rendezvousCrypto; + private final PluginManager pluginManager; + private final ConnectionManager connectionManager; + private final EventBus eventBus; + private final Clock clock; + + // Executor that runs one task at a time + private final Executor worker; + // The following fields are only accessed on the worker + private final Map pluginStates = new HashMap<>(); + private final Map rendezvousKeys = + new HashMap<>(); + @Nullable + private KeyPair handshakeKeyPair = null; + + @Inject + RendezvousPollerImpl(@IoExecutor Executor ioExecutor, + @Scheduler ScheduledExecutorService scheduler, + DatabaseComponent db, + IdentityManager identityManager, + RendezvousCrypto rendezvousCrypto, + PluginManager pluginManager, + ConnectionManager connectionManager, + EventBus eventBus, + Clock clock) { + this.scheduler = scheduler; + this.db = db; + this.identityManager = identityManager; + this.rendezvousCrypto = rendezvousCrypto; + this.pluginManager = pluginManager; + this.connectionManager = connectionManager; + this.eventBus = eventBus; + this.clock = clock; + worker = new PoliteExecutor("RendezvousPoller", ioExecutor, 1); + } + + @Override + public void startService() throws ServiceException { + try { + db.transaction(true, txn -> { + Collection pending = db.getPendingContacts(txn); + // Use a commit action to prevent races with add/remove events + txn.attach(() -> addPendingContactsAsync(pending)); + }); + } catch (DbException e) { + throw new ServiceException(e); + } + scheduler.scheduleAtFixedRate(this::poll, POLLING_INTERVAL_MS, + POLLING_INTERVAL_MS, MILLISECONDS); + } + + @EventExecutor + private void addPendingContactsAsync(Collection pending) { + worker.execute(() -> { + for (PendingContact p : pending) addPendingContact(p); + }); + } + + // Worker + private void addPendingContact(PendingContact p) { + long now = clock.currentTimeMillis(); + long expiry = p.getTimestamp() + RENDEZVOUS_TIMEOUT_MS; + if (expiry > now) { + scheduler.schedule(() -> expirePendingContactAsync(p.getId()), + expiry - now, MILLISECONDS); + } else { + eventBus.broadcast(new RendezvousFailedEvent(p.getId())); + return; + } + try { + if (handshakeKeyPair == null) { + handshakeKeyPair = db.transactionWithResult(true, + identityManager::getHandshakeKeys); + } + SecretKey rendezvousKey = rendezvousCrypto + .deriveRendezvousKey(p.getPublicKey(), handshakeKeyPair); + requireNull(rendezvousKeys.put(p.getId(), rendezvousKey)); + for (PluginState ps : pluginStates.values()) { + RendezvousEndpoint endpoint = + createEndpoint(ps.plugin, p.getId(), rendezvousKey); + if (endpoint != null) + requireNull(ps.endpoints.put(p.getId(), endpoint)); + } + } catch (DbException | GeneralSecurityException e) { + logException(LOG, WARNING, e); + } + } + + @Scheduler + private void expirePendingContactAsync(PendingContactId p) { + worker.execute(() -> expirePendingContact(p)); + } + + // Worker + private void expirePendingContact(PendingContactId p) { + if (removePendingContact(p)) + eventBus.broadcast(new RendezvousFailedEvent(p)); + } + + // Worker + private boolean removePendingContact(PendingContactId p) { + // We can come here twice if a pending contact fails and is then removed + if (rendezvousKeys.remove(p) == null) return false; + for (PluginState state : pluginStates.values()) { + RendezvousEndpoint endpoint = state.endpoints.remove(p); + if (endpoint != null) tryToClose(endpoint, LOG, INFO); + } + return true; + } + + @Nullable + private RendezvousEndpoint createEndpoint(DuplexPlugin plugin, + PendingContactId p, SecretKey rendezvousKey) { + TransportId t = plugin.getId(); + KeyMaterialSource k = + rendezvousCrypto.createKeyMaterialSource(rendezvousKey, t); + Handler h = new Handler(p, t, true); + return plugin.createRendezvousEndpoint(k, h); + } + + @Scheduler + private void poll() { + worker.execute(() -> { + for (PluginState state : pluginStates.values()) poll(state); + }); + } + + // Worker + private void poll(PluginState state) { + List> properties = + new ArrayList<>(); + for (Entry e : + state.endpoints.entrySet()) { + TransportProperties p = + e.getValue().getRemoteTransportProperties(); + Handler h = new Handler(e.getKey(), state.plugin.getId(), false); + properties.add(new Pair<>(p, h)); + } + state.plugin.poll(properties); + } + + @Override + public void stopService() { + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof PendingContactAddedEvent) { + PendingContactAddedEvent p = (PendingContactAddedEvent) e; + addPendingContactAsync(p.getPendingContact()); + } else if (e instanceof PendingContactRemovedEvent) { + PendingContactRemovedEvent p = (PendingContactRemovedEvent) e; + removePendingContactAsync(p.getId()); + } else if (e instanceof TransportEnabledEvent) { + TransportEnabledEvent t = (TransportEnabledEvent) e; + addTransportAsync(t.getTransportId()); + } else if (e instanceof TransportDisabledEvent) { + TransportDisabledEvent t = (TransportDisabledEvent) e; + removeTransportAsync(t.getTransportId()); + } + } + + @EventExecutor + private void addPendingContactAsync(PendingContact p) { + worker.execute(() -> addPendingContact(p)); + } + + @EventExecutor + private void removePendingContactAsync(PendingContactId p) { + worker.execute(() -> removePendingContact(p)); + } + + @EventExecutor + private void addTransportAsync(TransportId t) { + Plugin p = pluginManager.getPlugin(t); + if (p instanceof DuplexPlugin) { + DuplexPlugin d = (DuplexPlugin) p; + if (d.supportsRendezvous()) + worker.execute(() -> addTransport(d)); + } + } + + // Worker + private void addTransport(DuplexPlugin plugin) { + TransportId t = plugin.getId(); + Map endpoints = new HashMap<>(); + for (Entry e : rendezvousKeys.entrySet()) { + RendezvousEndpoint endpoint = + createEndpoint(plugin, e.getKey(), e.getValue()); + if (endpoint != null) endpoints.put(e.getKey(), endpoint); + } + requireNull(pluginStates.put(t, new PluginState(plugin, endpoints))); + } + + @EventExecutor + private void removeTransportAsync(TransportId t) { + worker.execute(() -> removeTransport(t)); + } + + // Worker + private void removeTransport(TransportId t) { + PluginState state = pluginStates.remove(t); + if (state != null) { + for (RendezvousEndpoint endpoint : state.endpoints.values()) { + tryToClose(endpoint, LOG, INFO); + } + } + } + + private static class PluginState { + + private final DuplexPlugin plugin; + private final Map endpoints; + + private PluginState(DuplexPlugin plugin, + Map endpoints) { + this.plugin = plugin; + this.endpoints = endpoints; + } + } + + private class Handler implements ConnectionHandler { + + private final PendingContactId pendingContactId; + private final TransportId transportId; + private final boolean incoming; + + private Handler(PendingContactId pendingContactId, + TransportId transportId, boolean incoming) { + this.pendingContactId = pendingContactId; + this.transportId = transportId; + this.incoming = incoming; + } + + @Override + public void handleConnection(DuplexTransportConnection c) { + if (incoming) { + connectionManager.manageIncomingConnection(pendingContactId, + transportId, c); + } else { + connectionManager.manageOutgoingConnection(pendingContactId, + transportId, c); + } + } + + @Override + public void handleReader(TransportConnectionReader r) { + throw new UnsupportedOperationException(); + } + + @Override + public void handleWriter(TransportConnectionWriter w) { + throw new UnsupportedOperationException(); + } + } +}