mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-16 12:49:55 +01:00
Add transports to DB during startup. #269
This commit is contained in:
@@ -324,11 +324,6 @@ interface Database<T> {
|
||||
Map<ContactId, TransportKeys> getTransportKeys(T txn, TransportId t)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the maximum latencies in milliseconds of all transports.
|
||||
*/
|
||||
Map<TransportId, Integer> getTransportLatencies(T txn) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of all contacts to which the given group is visible.
|
||||
*/
|
||||
|
||||
@@ -33,8 +33,6 @@ import org.briarproject.api.event.MessageValidatedEvent;
|
||||
import org.briarproject.api.event.MessagesAckedEvent;
|
||||
import org.briarproject.api.event.MessagesSentEvent;
|
||||
import org.briarproject.api.event.SettingsUpdatedEvent;
|
||||
import org.briarproject.api.event.TransportAddedEvent;
|
||||
import org.briarproject.api.event.TransportRemovedEvent;
|
||||
import org.briarproject.api.identity.Author;
|
||||
import org.briarproject.api.identity.AuthorId;
|
||||
import org.briarproject.api.identity.LocalAuthor;
|
||||
@@ -192,10 +190,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
public void addTransport(Transaction transaction, TransportId t,
|
||||
int maxLatency) throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsTransport(txn, t)) {
|
||||
if (!db.containsTransport(txn, t))
|
||||
db.addTransport(txn, t, maxLatency);
|
||||
transaction.attach(new TransportAddedEvent(t, maxLatency));
|
||||
}
|
||||
}
|
||||
|
||||
public void addTransportKeys(Transaction transaction, ContactId c,
|
||||
@@ -420,12 +416,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
return db.getTransportKeys(txn, t);
|
||||
}
|
||||
|
||||
public Map<TransportId, Integer> getTransportLatencies(
|
||||
Transaction transaction) throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
return db.getTransportLatencies(txn);
|
||||
}
|
||||
|
||||
public void incrementStreamCounter(Transaction transaction, ContactId c,
|
||||
TransportId t, long rotationPeriod) throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
@@ -579,7 +569,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
if (!db.containsTransport(txn, t))
|
||||
throw new NoSuchTransportException();
|
||||
db.removeTransport(txn, t);
|
||||
transaction.attach(new TransportRemovedEvent(t));
|
||||
}
|
||||
|
||||
public void setContactActive(Transaction transaction, ContactId c,
|
||||
|
||||
@@ -1542,30 +1542,6 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public Map<TransportId, Integer> getTransportLatencies(Connection txn)
|
||||
throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT transportId, maxLatency FROM transports";
|
||||
ps = txn.prepareStatement(sql);
|
||||
rs = ps.executeQuery();
|
||||
Map<TransportId, Integer> latencies =
|
||||
new HashMap<TransportId, Integer>();
|
||||
while (rs.next()) {
|
||||
TransportId id = new TransportId(rs.getString(1));
|
||||
latencies.put(id, rs.getInt(2));
|
||||
}
|
||||
rs.close();
|
||||
ps.close();
|
||||
return Collections.unmodifiableMap(latencies);
|
||||
} catch (SQLException e) {
|
||||
tryToClose(rs);
|
||||
tryToClose(ps);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<ContactId> getVisibility(Connection txn, GroupId g)
|
||||
throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
|
||||
@@ -6,7 +6,6 @@ import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.event.ShutdownEvent;
|
||||
import org.briarproject.api.lifecycle.LifecycleManager;
|
||||
import org.briarproject.api.lifecycle.Service;
|
||||
import org.briarproject.api.system.Clock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
@@ -30,7 +29,6 @@ class LifecycleManagerImpl implements LifecycleManager {
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(LifecycleManagerImpl.class.getName());
|
||||
|
||||
private final Clock clock;
|
||||
private final DatabaseComponent db;
|
||||
private final EventBus eventBus;
|
||||
private final Collection<Service> services;
|
||||
@@ -41,8 +39,7 @@ class LifecycleManagerImpl implements LifecycleManager {
|
||||
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
|
||||
|
||||
@Inject
|
||||
LifecycleManagerImpl(Clock clock, DatabaseComponent db, EventBus eventBus) {
|
||||
this.clock = clock;
|
||||
LifecycleManagerImpl(DatabaseComponent db, EventBus eventBus) {
|
||||
this.db = db;
|
||||
this.eventBus = eventBus;
|
||||
services = new CopyOnWriteArrayList<Service>();
|
||||
@@ -68,9 +65,9 @@ class LifecycleManagerImpl implements LifecycleManager {
|
||||
}
|
||||
try {
|
||||
LOG.info("Starting services");
|
||||
long now = clock.currentTimeMillis();
|
||||
long start = System.currentTimeMillis();
|
||||
boolean reopened = db.open();
|
||||
long duration = clock.currentTimeMillis() - now;
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
if (reopened)
|
||||
LOG.info("Reopening database took " + duration + " ms");
|
||||
@@ -78,9 +75,9 @@ class LifecycleManagerImpl implements LifecycleManager {
|
||||
}
|
||||
dbLatch.countDown();
|
||||
for (Service s : services) {
|
||||
now = clock.currentTimeMillis();
|
||||
start = System.currentTimeMillis();
|
||||
boolean started = s.start();
|
||||
duration = clock.currentTimeMillis() - now;
|
||||
duration = System.currentTimeMillis() - start;
|
||||
if (!started) {
|
||||
if (LOG.isLoggable(WARNING)) {
|
||||
String name = s.getClass().getName();
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
package org.briarproject.lifecycle;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.lifecycle.IoExecutor;
|
||||
import org.briarproject.api.lifecycle.LifecycleManager;
|
||||
import org.briarproject.api.lifecycle.ShutdownManager;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
@@ -12,16 +16,11 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.lifecycle.IoExecutor;
|
||||
import org.briarproject.api.lifecycle.LifecycleManager;
|
||||
import org.briarproject.api.lifecycle.ShutdownManager;
|
||||
import org.briarproject.api.system.Clock;
|
||||
|
||||
import dagger.Module;
|
||||
import dagger.Provides;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
@Module
|
||||
public class LifecycleModule {
|
||||
|
||||
@@ -51,9 +50,9 @@ public class LifecycleModule {
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
LifecycleManager provideLifeCycleManager(Clock clock, DatabaseComponent db,
|
||||
LifecycleManager provideLifecycleManager(DatabaseComponent db,
|
||||
EventBus eventBus) {
|
||||
return new LifecycleManagerImpl(clock, db, eventBus);
|
||||
return new LifecycleManagerImpl(db, eventBus);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@@ -63,5 +62,4 @@ public class LifecycleModule {
|
||||
lifecycleManager.registerForShutdown(ioExecutor);
|
||||
return ioExecutor;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -90,8 +90,8 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
TransportConnectionReader r) throws IOException {
|
||||
InputStream streamReader = streamReaderFactory.createStreamReader(
|
||||
r.getInputStream(), ctx);
|
||||
return syncSessionFactory.createIncomingSession(
|
||||
ctx.getContactId(), ctx.getTransportId(), streamReader);
|
||||
return syncSessionFactory.createIncomingSession(ctx.getContactId(),
|
||||
streamReader);
|
||||
}
|
||||
|
||||
private SyncSession createSimplexOutgoingSession(StreamContext ctx,
|
||||
@@ -99,8 +99,7 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
OutputStream streamWriter = streamWriterFactory.createStreamWriter(
|
||||
w.getOutputStream(), ctx);
|
||||
return syncSessionFactory.createSimplexOutgoingSession(
|
||||
ctx.getContactId(), ctx.getTransportId(), w.getMaxLatency(),
|
||||
streamWriter);
|
||||
ctx.getContactId(), w.getMaxLatency(), streamWriter);
|
||||
}
|
||||
|
||||
private SyncSession createDuplexOutgoingSession(StreamContext ctx,
|
||||
@@ -108,8 +107,8 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
OutputStream streamWriter = streamWriterFactory.createStreamWriter(
|
||||
w.getOutputStream(), ctx);
|
||||
return syncSessionFactory.createDuplexOutgoingSession(
|
||||
ctx.getContactId(), ctx.getTransportId(), w.getMaxLatency(),
|
||||
w.getMaxIdleTime(), streamWriter);
|
||||
ctx.getContactId(), w.getMaxLatency(), w.getMaxIdleTime(),
|
||||
streamWriter);
|
||||
}
|
||||
|
||||
private class ManageIncomingSimplexConnection implements Runnable {
|
||||
|
||||
@@ -2,9 +2,7 @@ package org.briarproject.plugins;
|
||||
|
||||
import org.briarproject.api.TransportId;
|
||||
import org.briarproject.api.contact.ContactId;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.db.DbException;
|
||||
import org.briarproject.api.db.Transaction;
|
||||
import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.event.TransportDisabledEvent;
|
||||
import org.briarproject.api.event.TransportEnabledEvent;
|
||||
@@ -28,7 +26,6 @@ import org.briarproject.api.properties.TransportProperties;
|
||||
import org.briarproject.api.properties.TransportPropertyManager;
|
||||
import org.briarproject.api.settings.Settings;
|
||||
import org.briarproject.api.settings.SettingsManager;
|
||||
import org.briarproject.api.system.Clock;
|
||||
import org.briarproject.api.ui.UiCallback;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -56,8 +53,6 @@ class PluginManagerImpl implements PluginManager, Service {
|
||||
private final Executor ioExecutor;
|
||||
private final EventBus eventBus;
|
||||
private final PluginConfig pluginConfig;
|
||||
private final Clock clock;
|
||||
private final DatabaseComponent db;
|
||||
private final Poller poller;
|
||||
private final ConnectionManager connectionManager;
|
||||
private final SettingsManager settingsManager;
|
||||
@@ -69,8 +64,7 @@ class PluginManagerImpl implements PluginManager, Service {
|
||||
|
||||
@Inject
|
||||
PluginManagerImpl(@IoExecutor Executor ioExecutor, EventBus eventBus,
|
||||
PluginConfig pluginConfig, Clock clock,
|
||||
DatabaseComponent db, Poller poller,
|
||||
PluginConfig pluginConfig, Poller poller,
|
||||
ConnectionManager connectionManager,
|
||||
SettingsManager settingsManager,
|
||||
TransportPropertyManager transportPropertyManager,
|
||||
@@ -78,8 +72,6 @@ class PluginManagerImpl implements PluginManager, Service {
|
||||
this.ioExecutor = ioExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.pluginConfig = pluginConfig;
|
||||
this.clock = clock;
|
||||
this.db = db;
|
||||
this.poller = poller;
|
||||
this.connectionManager = connectionManager;
|
||||
this.settingsManager = settingsManager;
|
||||
@@ -181,26 +173,9 @@ class PluginManagerImpl implements PluginManager, Service {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
long start = clock.currentTimeMillis();
|
||||
Transaction txn = db.startTransaction();
|
||||
try {
|
||||
db.addTransport(txn, id, plugin.getMaxLatency());
|
||||
txn.setComplete();
|
||||
} finally {
|
||||
db.endTransaction(txn);
|
||||
}
|
||||
long duration = clock.currentTimeMillis() - start;
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info("Adding transport took " + duration + " ms");
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
long start = clock.currentTimeMillis();
|
||||
long start = System.currentTimeMillis();
|
||||
boolean started = plugin.start();
|
||||
long duration = clock.currentTimeMillis() - start;
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
if (started) {
|
||||
plugins.put(id, plugin);
|
||||
simplexPlugins.add(plugin);
|
||||
@@ -250,26 +225,9 @@ class PluginManagerImpl implements PluginManager, Service {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
long start = clock.currentTimeMillis();
|
||||
Transaction txn = db.startTransaction();
|
||||
try {
|
||||
db.addTransport(txn, id, plugin.getMaxLatency());
|
||||
txn.setComplete();
|
||||
} finally {
|
||||
db.endTransaction(txn);
|
||||
}
|
||||
long duration = clock.currentTimeMillis() - start;
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info("Adding transport took " + duration + " ms");
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING))
|
||||
LOG.log(WARNING, e.toString(), e);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
long start = clock.currentTimeMillis();
|
||||
long start = System.currentTimeMillis();
|
||||
boolean started = plugin.start();
|
||||
long duration = clock.currentTimeMillis() - start;
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
if (started) {
|
||||
plugins.put(id, plugin);
|
||||
duplexPlugins.add(plugin);
|
||||
@@ -307,9 +265,9 @@ class PluginManagerImpl implements PluginManager, Service {
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
long start = clock.currentTimeMillis();
|
||||
long start = System.currentTimeMillis();
|
||||
plugin.stop();
|
||||
long duration = clock.currentTimeMillis() - start;
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
String name = plugin.getClass().getSimpleName();
|
||||
LOG.info("Stopping " + name + " took " + duration + " ms");
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
package org.briarproject.plugins;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.lifecycle.IoExecutor;
|
||||
import org.briarproject.api.lifecycle.LifecycleManager;
|
||||
@@ -19,10 +16,12 @@ import org.briarproject.api.transport.StreamWriterFactory;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import dagger.Module;
|
||||
import dagger.Provides;
|
||||
|
||||
|
||||
@Module
|
||||
public class PluginsModule {
|
||||
|
||||
@@ -45,8 +44,8 @@ public class PluginsModule {
|
||||
|
||||
@Provides
|
||||
ConnectionManager provideConnectionManager(
|
||||
@IoExecutor Executor ioExecutor,
|
||||
KeyManager keyManager, StreamReaderFactory streamReaderFactory,
|
||||
@IoExecutor Executor ioExecutor, KeyManager keyManager,
|
||||
StreamReaderFactory streamReaderFactory,
|
||||
StreamWriterFactory streamWriterFactory,
|
||||
SyncSessionFactory syncSessionFactory,
|
||||
ConnectionRegistry connectionRegistry) {
|
||||
@@ -61,7 +60,6 @@ public class PluginsModule {
|
||||
return new ConnectionRegistryImpl(eventBus);
|
||||
}
|
||||
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
PluginManager getPluginManager(LifecycleManager lifecycleManager,
|
||||
@@ -69,5 +67,4 @@ public class PluginsModule {
|
||||
lifecycleManager.register(pluginManager);
|
||||
return pluginManager;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
package org.briarproject.plugins.file;
|
||||
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import org.briarproject.api.plugins.TransportConnectionReader;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import org.briarproject.api.plugins.TransportConnectionReader;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
|
||||
class FileTransportReader implements TransportConnectionReader {
|
||||
|
||||
@@ -24,10 +24,6 @@ class FileTransportReader implements TransportConnectionReader {
|
||||
this.plugin = plugin;
|
||||
}
|
||||
|
||||
public long getMaxLatency() {
|
||||
return plugin.getMaxLatency();
|
||||
}
|
||||
|
||||
public InputStream getInputStream() {
|
||||
return in;
|
||||
}
|
||||
|
||||
@@ -30,6 +30,10 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
|
||||
return LanTcpPlugin.ID;
|
||||
}
|
||||
|
||||
public int getMaxLatency() {
|
||||
return MAX_LATENCY;
|
||||
}
|
||||
|
||||
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
|
||||
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
|
||||
MAX_POLLING_INTERVAL, BACKOFF_BASE);
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
package org.briarproject.plugins.tcp;
|
||||
|
||||
import org.briarproject.api.plugins.Plugin;
|
||||
import org.briarproject.api.plugins.TransportConnectionReader;
|
||||
import org.briarproject.api.plugins.TransportConnectionWriter;
|
||||
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.briarproject.api.plugins.Plugin;
|
||||
import org.briarproject.api.plugins.TransportConnectionReader;
|
||||
import org.briarproject.api.plugins.TransportConnectionWriter;
|
||||
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
|
||||
|
||||
class TcpTransportConnection implements DuplexTransportConnection {
|
||||
|
||||
private final Plugin plugin;
|
||||
@@ -38,10 +38,6 @@ class TcpTransportConnection implements DuplexTransportConnection {
|
||||
|
||||
private class Reader implements TransportConnectionReader {
|
||||
|
||||
public long getMaxLatency() {
|
||||
return plugin.getMaxLatency();
|
||||
}
|
||||
|
||||
public InputStream getInputStream() throws IOException {
|
||||
return socket.getInputStream();
|
||||
}
|
||||
|
||||
@@ -33,6 +33,10 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
|
||||
return WanTcpPlugin.ID;
|
||||
}
|
||||
|
||||
public int getMaxLatency() {
|
||||
return MAX_LATENCY;
|
||||
}
|
||||
|
||||
public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
|
||||
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
|
||||
MAX_POLLING_INTERVAL, BACKOFF_BASE);
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package org.briarproject.sync;
|
||||
|
||||
import org.briarproject.api.TransportId;
|
||||
import org.briarproject.api.contact.ContactId;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.db.DbException;
|
||||
@@ -15,7 +14,6 @@ import org.briarproject.api.event.MessageSharedEvent;
|
||||
import org.briarproject.api.event.MessageToAckEvent;
|
||||
import org.briarproject.api.event.MessageToRequestEvent;
|
||||
import org.briarproject.api.event.ShutdownEvent;
|
||||
import org.briarproject.api.event.TransportRemovedEvent;
|
||||
import org.briarproject.api.sync.Ack;
|
||||
import org.briarproject.api.sync.Offer;
|
||||
import org.briarproject.api.sync.PacketWriter;
|
||||
@@ -59,7 +57,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
private final EventBus eventBus;
|
||||
private final Clock clock;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final int maxLatency, maxIdleTime;
|
||||
private final PacketWriter packetWriter;
|
||||
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
|
||||
@@ -67,15 +64,13 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, Clock clock, ContactId contactId,
|
||||
TransportId transportId, int maxLatency, int maxIdleTime,
|
||||
PacketWriter packetWriter) {
|
||||
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency,
|
||||
int maxIdleTime, PacketWriter packetWriter) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.clock = clock;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.maxIdleTime = maxIdleTime;
|
||||
this.packetWriter = packetWriter;
|
||||
@@ -167,9 +162,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
dbExecutor.execute(new GenerateRequest());
|
||||
} else if (e instanceof ShutdownEvent) {
|
||||
interrupt();
|
||||
} else if (e instanceof TransportRemovedEvent) {
|
||||
TransportRemovedEvent t = (TransportRemovedEvent) e;
|
||||
if (t.getTransportId().equals(transportId)) interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package org.briarproject.sync;
|
||||
|
||||
import org.briarproject.api.FormatException;
|
||||
import org.briarproject.api.TransportId;
|
||||
import org.briarproject.api.contact.ContactId;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.db.DbException;
|
||||
@@ -11,7 +10,6 @@ import org.briarproject.api.event.Event;
|
||||
import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.event.EventListener;
|
||||
import org.briarproject.api.event.ShutdownEvent;
|
||||
import org.briarproject.api.event.TransportRemovedEvent;
|
||||
import org.briarproject.api.sync.Ack;
|
||||
import org.briarproject.api.sync.Message;
|
||||
import org.briarproject.api.sync.Offer;
|
||||
@@ -37,19 +35,17 @@ class IncomingSession implements SyncSession, EventListener {
|
||||
private final Executor dbExecutor;
|
||||
private final EventBus eventBus;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final PacketReader packetReader;
|
||||
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
IncomingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, ContactId contactId, TransportId transportId,
|
||||
EventBus eventBus, ContactId contactId,
|
||||
PacketReader packetReader) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.packetReader = packetReader;
|
||||
}
|
||||
|
||||
@@ -90,9 +86,6 @@ class IncomingSession implements SyncSession, EventListener {
|
||||
if (c.getContactId().equals(contactId)) interrupt();
|
||||
} else if (e instanceof ShutdownEvent) {
|
||||
interrupt();
|
||||
} else if (e instanceof TransportRemovedEvent) {
|
||||
TransportRemovedEvent t = (TransportRemovedEvent) e;
|
||||
if (t.getTransportId().equals(transportId)) interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package org.briarproject.sync;
|
||||
|
||||
import org.briarproject.api.TransportId;
|
||||
import org.briarproject.api.contact.ContactId;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.db.DbException;
|
||||
@@ -10,7 +9,6 @@ import org.briarproject.api.event.Event;
|
||||
import org.briarproject.api.event.EventBus;
|
||||
import org.briarproject.api.event.EventListener;
|
||||
import org.briarproject.api.event.ShutdownEvent;
|
||||
import org.briarproject.api.event.TransportRemovedEvent;
|
||||
import org.briarproject.api.sync.Ack;
|
||||
import org.briarproject.api.sync.PacketWriter;
|
||||
import org.briarproject.api.sync.SyncSession;
|
||||
@@ -48,7 +46,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
private final Executor dbExecutor;
|
||||
private final EventBus eventBus;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final int maxLatency;
|
||||
private final PacketWriter packetWriter;
|
||||
private final AtomicInteger outstandingQueries;
|
||||
@@ -57,13 +54,12 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, ContactId contactId, TransportId transportId,
|
||||
EventBus eventBus, ContactId contactId,
|
||||
int maxLatency, PacketWriter packetWriter) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.packetWriter = packetWriter;
|
||||
outstandingQueries = new AtomicInteger(2); // One per type of packet
|
||||
@@ -108,9 +104,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
if (c.getContactId().equals(contactId)) interrupt();
|
||||
} else if (e instanceof ShutdownEvent) {
|
||||
interrupt();
|
||||
} else if (e instanceof TransportRemovedEvent) {
|
||||
TransportRemovedEvent t = (TransportRemovedEvent) e;
|
||||
if (t.getTransportId().equals(transportId)) interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package org.briarproject.sync;
|
||||
|
||||
import org.briarproject.api.TransportId;
|
||||
import org.briarproject.api.contact.ContactId;
|
||||
import org.briarproject.api.db.DatabaseComponent;
|
||||
import org.briarproject.api.db.DatabaseExecutor;
|
||||
@@ -41,24 +40,22 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
|
||||
this.packetWriterFactory = packetWriterFactory;
|
||||
}
|
||||
|
||||
public SyncSession createIncomingSession(ContactId c, TransportId t,
|
||||
InputStream in) {
|
||||
public SyncSession createIncomingSession(ContactId c, InputStream in) {
|
||||
PacketReader packetReader = packetReaderFactory.createPacketReader(in);
|
||||
return new IncomingSession(db, dbExecutor, eventBus, c, t,
|
||||
packetReader);
|
||||
return new IncomingSession(db, dbExecutor, eventBus, c, packetReader);
|
||||
}
|
||||
|
||||
public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
|
||||
public SyncSession createSimplexOutgoingSession(ContactId c,
|
||||
int maxLatency, OutputStream out) {
|
||||
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out);
|
||||
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
|
||||
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c,
|
||||
maxLatency, packetWriter);
|
||||
}
|
||||
|
||||
public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
|
||||
int maxLatency, int maxIdleTime, OutputStream out) {
|
||||
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
|
||||
int maxIdleTime, OutputStream out) {
|
||||
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out);
|
||||
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t,
|
||||
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c,
|
||||
maxLatency, maxIdleTime, packetWriter);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,15 +13,17 @@ import org.briarproject.api.event.ContactRemovedEvent;
|
||||
import org.briarproject.api.event.ContactStatusChangedEvent;
|
||||
import org.briarproject.api.event.Event;
|
||||
import org.briarproject.api.event.EventListener;
|
||||
import org.briarproject.api.event.TransportAddedEvent;
|
||||
import org.briarproject.api.event.TransportRemovedEvent;
|
||||
import org.briarproject.api.lifecycle.Service;
|
||||
import org.briarproject.api.plugins.PluginConfig;
|
||||
import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
|
||||
import org.briarproject.api.plugins.simplex.SimplexPluginFactory;
|
||||
import org.briarproject.api.system.Clock;
|
||||
import org.briarproject.api.system.Timer;
|
||||
import org.briarproject.api.transport.KeyManager;
|
||||
import org.briarproject.api.transport.StreamContext;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@@ -40,6 +42,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
|
||||
private final DatabaseComponent db;
|
||||
private final CryptoComponent crypto;
|
||||
private final ExecutorService dbExecutor;
|
||||
private final PluginConfig pluginConfig;
|
||||
private final Timer timer;
|
||||
private final Clock clock;
|
||||
private final Map<ContactId, Boolean> activeContacts;
|
||||
@@ -47,11 +50,12 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
|
||||
|
||||
@Inject
|
||||
KeyManagerImpl(DatabaseComponent db, CryptoComponent crypto,
|
||||
@DatabaseExecutor ExecutorService dbExecutor, Timer timer,
|
||||
Clock clock) {
|
||||
@DatabaseExecutor ExecutorService dbExecutor,
|
||||
PluginConfig pluginConfig, Timer timer, Clock clock) {
|
||||
this.db = db;
|
||||
this.crypto = crypto;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.pluginConfig = pluginConfig;
|
||||
this.timer = timer;
|
||||
this.clock = clock;
|
||||
// Use a ConcurrentHashMap as a thread-safe set
|
||||
@@ -61,21 +65,31 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
|
||||
|
||||
@Override
|
||||
public boolean start() {
|
||||
Map<TransportId, Integer> latencies =
|
||||
new HashMap<TransportId, Integer>();
|
||||
for (SimplexPluginFactory f : pluginConfig.getSimplexFactories())
|
||||
latencies.put(f.getId(), f.getMaxLatency());
|
||||
for (DuplexPluginFactory f : pluginConfig.getDuplexFactories())
|
||||
latencies.put(f.getId(), f.getMaxLatency());
|
||||
try {
|
||||
Collection<Contact> contacts;
|
||||
Map<TransportId, Integer> latencies;
|
||||
Transaction txn = db.startTransaction();
|
||||
try {
|
||||
contacts = db.getContacts(txn);
|
||||
latencies = db.getTransportLatencies(txn);
|
||||
for (Entry<TransportId, Integer> e : latencies.entrySet())
|
||||
db.addTransport(txn, e.getKey(), e.getValue());
|
||||
txn.setComplete();
|
||||
} finally {
|
||||
db.endTransaction(txn);
|
||||
}
|
||||
for (Contact c : contacts)
|
||||
if (c.isActive()) activeContacts.put(c.getId(), true);
|
||||
for (Entry<TransportId, Integer> e : latencies.entrySet())
|
||||
addTransport(e.getKey(), e.getValue());
|
||||
for (Entry<TransportId, Integer> e : latencies.entrySet()) {
|
||||
TransportKeyManager m = new TransportKeyManager(db, crypto,
|
||||
timer, clock, e.getKey(), e.getValue());
|
||||
managers.put(e.getKey(), m);
|
||||
m.start();
|
||||
}
|
||||
} catch (DbException e) {
|
||||
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
|
||||
return false;
|
||||
@@ -125,12 +139,7 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
|
||||
}
|
||||
|
||||
public void eventOccurred(Event e) {
|
||||
if (e instanceof TransportAddedEvent) {
|
||||
TransportAddedEvent t = (TransportAddedEvent) e;
|
||||
addTransport(t.getTransportId(), t.getMaxLatency());
|
||||
} else if (e instanceof TransportRemovedEvent) {
|
||||
removeTransport(((TransportRemovedEvent) e).getTransportId());
|
||||
} else if (e instanceof ContactRemovedEvent) {
|
||||
if (e instanceof ContactRemovedEvent) {
|
||||
removeContact(((ContactRemovedEvent) e).getContactId());
|
||||
} else if (e instanceof ContactStatusChangedEvent) {
|
||||
ContactStatusChangedEvent c = (ContactStatusChangedEvent) e;
|
||||
@@ -139,21 +148,6 @@ class KeyManagerImpl implements KeyManager, Service, EventListener {
|
||||
}
|
||||
}
|
||||
|
||||
private void addTransport(final TransportId t, final int maxLatency) {
|
||||
dbExecutor.execute(new Runnable() {
|
||||
public void run() {
|
||||
TransportKeyManager m = new TransportKeyManager(db, crypto,
|
||||
timer, clock, t, maxLatency);
|
||||
// Don't add transport twice if event is received during startup
|
||||
if (managers.putIfAbsent(t, m) == null) m.start();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void removeTransport(TransportId t) {
|
||||
managers.remove(t);
|
||||
}
|
||||
|
||||
private void removeContact(final ContactId c) {
|
||||
activeContacts.remove(c);
|
||||
dbExecutor.execute(new Runnable() {
|
||||
|
||||
Reference in New Issue
Block a user