mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-12 18:59:06 +01:00
Added a connection registry to avoid creating redundant connections.
This commit is contained in:
@@ -51,8 +51,8 @@ class PluginManagerImpl implements PluginManager {
|
||||
"net.sf.briar.plugins.socket.SimpleSocketPluginFactory"
|
||||
};
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final Executor pluginExecutor;
|
||||
private final DatabaseComponent db;
|
||||
private final Poller poller;
|
||||
private final ConnectionDispatcher dispatcher;
|
||||
private final UiCallback uiCallback;
|
||||
@@ -60,11 +60,11 @@ class PluginManagerImpl implements PluginManager {
|
||||
private final List<StreamPlugin> streamPlugins; // Locking: this
|
||||
|
||||
@Inject
|
||||
PluginManagerImpl(DatabaseComponent db,
|
||||
@PluginExecutor Executor pluginExecutor, Poller poller,
|
||||
PluginManagerImpl(@PluginExecutor Executor pluginExecutor,
|
||||
DatabaseComponent db, Poller poller,
|
||||
ConnectionDispatcher dispatcher, UiCallback uiCallback) {
|
||||
this.db = db;
|
||||
this.pluginExecutor = pluginExecutor;
|
||||
this.db = db;
|
||||
this.poller = poller;
|
||||
this.dispatcher = dispatcher;
|
||||
this.uiCallback = uiCallback;
|
||||
@@ -295,7 +295,7 @@ class PluginManagerImpl implements PluginManager {
|
||||
|
||||
public void writerCreated(ContactId c, BatchTransportWriter w) {
|
||||
assert index != null;
|
||||
dispatcher.dispatchWriter(c, index, w);
|
||||
dispatcher.dispatchWriter(c, id, index, w);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,7 +310,7 @@ class PluginManagerImpl implements PluginManager {
|
||||
public void outgoingConnectionCreated(ContactId c,
|
||||
StreamTransportConnection s) {
|
||||
assert index != null;
|
||||
dispatcher.dispatchOutgoingConnection(c, index, s);
|
||||
dispatcher.dispatchOutgoingConnection(c, id, index, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,14 +6,25 @@ import java.util.TreeSet;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import net.sf.briar.api.ContactId;
|
||||
import net.sf.briar.api.plugins.Plugin;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
class PollerImpl implements Poller, Runnable {
|
||||
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(PollerImpl.class.getName());
|
||||
|
||||
private final SortedSet<PollTime> pollTimes = new TreeSet<PollTime>();
|
||||
private final ConnectionRegistry connRegistry;
|
||||
private final SortedSet<PollTime> pollTimes;
|
||||
|
||||
@Inject
|
||||
PollerImpl(ConnectionRegistry connRegistry) {
|
||||
this.connRegistry = connRegistry;
|
||||
pollTimes = new TreeSet<PollTime>();
|
||||
}
|
||||
|
||||
public synchronized void startPolling(Collection<Plugin> plugins) {
|
||||
for(Plugin plugin : plugins) schedule(plugin);
|
||||
@@ -41,8 +52,10 @@ class PollerImpl implements Poller, Runnable {
|
||||
long now = System.currentTimeMillis();
|
||||
if(now <= p.time) {
|
||||
pollTimes.remove(p);
|
||||
Collection<ContactId> connected =
|
||||
connRegistry.getConnectedContacts(p.plugin.getId());
|
||||
try {
|
||||
p.plugin.poll();
|
||||
p.plugin.poll(connected);
|
||||
} catch(RuntimeException e) {
|
||||
if(LOG.isLoggable(Level.WARNING))
|
||||
LOG.warning("Plugin " + p.plugin.getId() + " " + e);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package net.sf.briar.plugins.bluetooth;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@@ -178,18 +179,18 @@ class BluetoothPlugin implements StreamPlugin {
|
||||
return pollingInterval;
|
||||
}
|
||||
|
||||
public void poll() {
|
||||
public void poll(final Collection<ContactId> connected) {
|
||||
synchronized(this) {
|
||||
if(!running) return;
|
||||
}
|
||||
pluginExecutor.execute(new Runnable() {
|
||||
public void run() {
|
||||
connectAndCallBack();
|
||||
connectAndCallBack(connected);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void connectAndCallBack() {
|
||||
private void connectAndCallBack(Collection<ContactId> connected) {
|
||||
synchronized(this) {
|
||||
if(!running) return;
|
||||
}
|
||||
@@ -198,6 +199,8 @@ class BluetoothPlugin implements StreamPlugin {
|
||||
Map<ContactId, String> discovered = discoverContactUrls(remote);
|
||||
for(Entry<ContactId, String> e : discovered.entrySet()) {
|
||||
ContactId c = e.getKey();
|
||||
// Don't create redundant connections
|
||||
if(connected.contains(c)) continue;
|
||||
String url = e.getValue();
|
||||
StreamTransportConnection s = connect(c, url);
|
||||
if(s != null) callback.outgoingConnectionCreated(c, s);
|
||||
|
||||
@@ -10,6 +10,7 @@ import java.util.concurrent.Executor;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import net.sf.briar.api.ContactId;
|
||||
import net.sf.briar.api.plugins.BatchPluginCallback;
|
||||
import net.sf.briar.api.plugins.PluginExecutor;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
@@ -59,7 +60,7 @@ implements RemovableDriveMonitor.Callback {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public void poll() {
|
||||
public void poll(Collection<ContactId> connected) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
||||
@@ -31,26 +31,15 @@ class SimpleSocketPlugin extends SocketPlugin {
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(SimpleSocketPlugin.class.getName());
|
||||
|
||||
private final long pollingInterval;
|
||||
|
||||
SimpleSocketPlugin(@PluginExecutor Executor pluginExecutor,
|
||||
StreamPluginCallback callback, long pollingInterval) {
|
||||
super(pluginExecutor, callback);
|
||||
this.pollingInterval = pollingInterval;
|
||||
super(pluginExecutor, callback, pollingInterval);
|
||||
}
|
||||
|
||||
public TransportId getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public boolean shouldPoll() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public long getPollingInterval() {
|
||||
return pollingInterval;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Socket createClientSocket() throws IOException {
|
||||
assert running;
|
||||
|
||||
@@ -4,6 +4,7 @@ import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.logging.Level;
|
||||
@@ -24,6 +25,8 @@ abstract class SocketPlugin implements StreamPlugin {
|
||||
protected final Executor pluginExecutor;
|
||||
protected final StreamPluginCallback callback;
|
||||
|
||||
private final long pollingInterval;
|
||||
|
||||
protected boolean running = false; // Locking: this
|
||||
protected ServerSocket socket = null; // Locking: this
|
||||
|
||||
@@ -35,9 +38,10 @@ abstract class SocketPlugin implements StreamPlugin {
|
||||
protected abstract SocketAddress getRemoteSocketAddress(ContactId c);
|
||||
|
||||
protected SocketPlugin(@PluginExecutor Executor pluginExecutor,
|
||||
StreamPluginCallback callback) {
|
||||
StreamPluginCallback callback, long pollingInterval) {
|
||||
this.pluginExecutor = pluginExecutor;
|
||||
this.callback = callback;
|
||||
this.pollingInterval = pollingInterval;
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
@@ -115,13 +119,22 @@ abstract class SocketPlugin implements StreamPlugin {
|
||||
}
|
||||
}
|
||||
|
||||
public void poll() {
|
||||
public boolean shouldPoll() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public long getPollingInterval() {
|
||||
return pollingInterval;
|
||||
}
|
||||
|
||||
public void poll(Collection<ContactId> connected) {
|
||||
synchronized(this) {
|
||||
if(!running) return;
|
||||
}
|
||||
Map<ContactId, TransportProperties> remote =
|
||||
callback.getRemoteProperties();
|
||||
for(final ContactId c : remote.keySet()) {
|
||||
if(connected.contains(c)) continue;
|
||||
pluginExecutor.execute(new Runnable() {
|
||||
public void run() {
|
||||
connectAndCallBack(c);
|
||||
|
||||
@@ -7,6 +7,7 @@ import net.sf.briar.api.db.DatabaseComponent;
|
||||
import net.sf.briar.api.db.DatabaseExecutor;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
import net.sf.briar.api.protocol.TransportIndex;
|
||||
import net.sf.briar.api.protocol.VerificationExecutor;
|
||||
import net.sf.briar.api.protocol.batch.BatchConnectionFactory;
|
||||
@@ -14,6 +15,7 @@ import net.sf.briar.api.transport.BatchTransportReader;
|
||||
import net.sf.briar.api.transport.BatchTransportWriter;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
import net.sf.briar.api.transport.ConnectionWriterFactory;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
@@ -22,6 +24,7 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
|
||||
|
||||
private final Executor dbExecutor, verificationExecutor;
|
||||
private final DatabaseComponent db;
|
||||
private final ConnectionRegistry connRegistry;
|
||||
private final ConnectionReaderFactory connReaderFactory;
|
||||
private final ConnectionWriterFactory connWriterFactory;
|
||||
private final ProtocolReaderFactory protoReaderFactory;
|
||||
@@ -30,24 +33,26 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
|
||||
@Inject
|
||||
BatchConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
|
||||
@VerificationExecutor Executor verificationExecutor,
|
||||
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
|
||||
DatabaseComponent db, ConnectionRegistry connRegistry,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory) {
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.verificationExecutor = verificationExecutor;
|
||||
this.db = db;
|
||||
this.connRegistry = connRegistry;
|
||||
this.connReaderFactory = connReaderFactory;
|
||||
this.connWriterFactory = connWriterFactory;
|
||||
this.protoReaderFactory = protoReaderFactory;
|
||||
this.protoWriterFactory = protoWriterFactory;
|
||||
}
|
||||
|
||||
public void createIncomingConnection(ConnectionContext ctx,
|
||||
public void createIncomingConnection(ConnectionContext ctx, TransportId t,
|
||||
BatchTransportReader r, byte[] tag) {
|
||||
final IncomingBatchConnection conn = new IncomingBatchConnection(
|
||||
dbExecutor, verificationExecutor, db, connReaderFactory,
|
||||
protoReaderFactory, ctx, r, tag);
|
||||
dbExecutor, verificationExecutor, db, connRegistry,
|
||||
connReaderFactory, protoReaderFactory, ctx, t, r, tag);
|
||||
Runnable read = new Runnable() {
|
||||
public void run() {
|
||||
conn.read();
|
||||
@@ -56,10 +61,11 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
|
||||
new Thread(read).start();
|
||||
}
|
||||
|
||||
public void createOutgoingConnection(ContactId c, TransportIndex i,
|
||||
BatchTransportWriter w) {
|
||||
public void createOutgoingConnection(ContactId c, TransportId t,
|
||||
TransportIndex i, BatchTransportWriter w) {
|
||||
final OutgoingBatchConnection conn = new OutgoingBatchConnection(db,
|
||||
connWriterFactory, protoWriterFactory, c, i, w);
|
||||
connRegistry, connWriterFactory, protoWriterFactory,
|
||||
c, t, i, w);
|
||||
Runnable write = new Runnable() {
|
||||
public void run() {
|
||||
conn.write();
|
||||
|
||||
@@ -17,6 +17,7 @@ import net.sf.briar.api.protocol.Batch;
|
||||
import net.sf.briar.api.protocol.ProtocolReader;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
import net.sf.briar.api.protocol.UnverifiedBatch;
|
||||
import net.sf.briar.api.protocol.VerificationExecutor;
|
||||
@@ -24,6 +25,7 @@ import net.sf.briar.api.transport.BatchTransportReader;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionReader;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
|
||||
class IncomingBatchConnection {
|
||||
|
||||
@@ -31,31 +33,38 @@ class IncomingBatchConnection {
|
||||
Logger.getLogger(IncomingBatchConnection.class.getName());
|
||||
|
||||
private final Executor dbExecutor, verificationExecutor;
|
||||
private final ConnectionReaderFactory connFactory;
|
||||
private final DatabaseComponent db;
|
||||
private final ConnectionRegistry connRegistry;
|
||||
private final ConnectionReaderFactory connFactory;
|
||||
private final ProtocolReaderFactory protoFactory;
|
||||
private final ConnectionContext ctx;
|
||||
private final TransportId transportId;
|
||||
private final BatchTransportReader transport;
|
||||
private final byte[] tag;
|
||||
private final ContactId contactId;
|
||||
|
||||
IncomingBatchConnection(@DatabaseExecutor Executor dbExecutor,
|
||||
@VerificationExecutor Executor verificationExecutor,
|
||||
DatabaseComponent db, ConnectionReaderFactory connFactory,
|
||||
DatabaseComponent db, ConnectionRegistry connRegistry,
|
||||
ConnectionReaderFactory connFactory,
|
||||
ProtocolReaderFactory protoFactory, ConnectionContext ctx,
|
||||
BatchTransportReader transport, byte[] tag) {
|
||||
TransportId transportId, BatchTransportReader transport,
|
||||
byte[] tag) {
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.verificationExecutor = verificationExecutor;
|
||||
this.connFactory = connFactory;
|
||||
this.db = db;
|
||||
this.connRegistry = connRegistry;
|
||||
this.connFactory = connFactory;
|
||||
this.protoFactory = protoFactory;
|
||||
this.ctx = ctx;
|
||||
this.transportId = transportId;
|
||||
this.transport = transport;
|
||||
this.tag = tag;
|
||||
contactId = ctx.getContactId();
|
||||
}
|
||||
|
||||
void read() {
|
||||
connRegistry.registerConnection(contactId, transportId);
|
||||
try {
|
||||
ConnectionReader conn = connFactory.createConnectionReader(
|
||||
transport.getInputStream(), ctx.getSecret(), tag);
|
||||
@@ -83,6 +92,8 @@ class IncomingBatchConnection {
|
||||
} catch(IOException e) {
|
||||
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
|
||||
transport.dispose(true, true);
|
||||
} finally {
|
||||
connRegistry.unregisterConnection(contactId, transportId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,10 +16,12 @@ import net.sf.briar.api.protocol.ProtocolWriter;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.RawBatch;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
import net.sf.briar.api.protocol.TransportIndex;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
import net.sf.briar.api.transport.BatchTransportWriter;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
import net.sf.briar.api.transport.ConnectionWriter;
|
||||
import net.sf.briar.api.transport.ConnectionWriterFactory;
|
||||
|
||||
@@ -29,25 +31,32 @@ class OutgoingBatchConnection {
|
||||
Logger.getLogger(OutgoingBatchConnection.class.getName());
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final ConnectionRegistry connRegistry;
|
||||
private final ConnectionWriterFactory connFactory;
|
||||
private final ProtocolWriterFactory protoFactory;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final TransportIndex transportIndex;
|
||||
private final BatchTransportWriter transport;
|
||||
|
||||
OutgoingBatchConnection(DatabaseComponent db,
|
||||
ConnectionRegistry connRegistry,
|
||||
ConnectionWriterFactory connFactory,
|
||||
ProtocolWriterFactory protoFactory, ContactId contactId,
|
||||
TransportIndex transportIndex, BatchTransportWriter transport) {
|
||||
TransportId transportId, TransportIndex transportIndex,
|
||||
BatchTransportWriter transport) {
|
||||
this.db = db;
|
||||
this.connRegistry = connRegistry;
|
||||
this.connFactory = connFactory;
|
||||
this.protoFactory = protoFactory;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.transportIndex = transportIndex;
|
||||
this.transport = transport;
|
||||
}
|
||||
|
||||
void write() {
|
||||
connRegistry.registerConnection(contactId, transportId);
|
||||
try {
|
||||
ConnectionContext ctx = db.getConnectionContext(contactId,
|
||||
transportIndex);
|
||||
@@ -97,6 +106,8 @@ class OutgoingBatchConnection {
|
||||
} catch(IOException e) {
|
||||
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
|
||||
transport.dispose(true);
|
||||
} finally {
|
||||
connRegistry.unregisterConnection(contactId, transportId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,10 +7,12 @@ import net.sf.briar.api.db.DatabaseComponent;
|
||||
import net.sf.briar.api.db.DatabaseExecutor;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
import net.sf.briar.api.protocol.VerificationExecutor;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionReader;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
import net.sf.briar.api.transport.ConnectionWriter;
|
||||
import net.sf.briar.api.transport.ConnectionWriterFactory;
|
||||
import net.sf.briar.api.transport.StreamTransportConnection;
|
||||
@@ -22,15 +24,16 @@ class IncomingStreamConnection extends StreamConnection {
|
||||
|
||||
IncomingStreamConnection(@DatabaseExecutor Executor dbExecutor,
|
||||
@VerificationExecutor Executor verificationExecutor,
|
||||
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
|
||||
DatabaseComponent db, ConnectionRegistry connRegistry,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory,
|
||||
ConnectionContext ctx, StreamTransportConnection transport,
|
||||
byte[] tag) {
|
||||
super(dbExecutor, verificationExecutor, db, connReaderFactory,
|
||||
connWriterFactory, protoReaderFactory, protoWriterFactory,
|
||||
ctx.getContactId(), transport);
|
||||
ConnectionContext ctx, TransportId transportId,
|
||||
StreamTransportConnection transport, byte[] tag) {
|
||||
super(dbExecutor, verificationExecutor, db, connRegistry,
|
||||
connReaderFactory, connWriterFactory, protoReaderFactory,
|
||||
protoWriterFactory, ctx.getContactId(), transportId, transport);
|
||||
this.ctx = ctx;
|
||||
this.tag = tag;
|
||||
}
|
||||
|
||||
@@ -9,11 +9,13 @@ import net.sf.briar.api.db.DatabaseExecutor;
|
||||
import net.sf.briar.api.db.DbException;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
import net.sf.briar.api.protocol.TransportIndex;
|
||||
import net.sf.briar.api.protocol.VerificationExecutor;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionReader;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
import net.sf.briar.api.transport.ConnectionWriter;
|
||||
import net.sf.briar.api.transport.ConnectionWriterFactory;
|
||||
import net.sf.briar.api.transport.StreamTransportConnection;
|
||||
@@ -26,15 +28,16 @@ class OutgoingStreamConnection extends StreamConnection {
|
||||
|
||||
OutgoingStreamConnection(@DatabaseExecutor Executor dbExecutor,
|
||||
@VerificationExecutor Executor verificationExecutor,
|
||||
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
|
||||
DatabaseComponent db, ConnectionRegistry connRegistry,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
|
||||
TransportIndex transportIndex,
|
||||
TransportId transportId, TransportIndex transportIndex,
|
||||
StreamTransportConnection transport) {
|
||||
super(dbExecutor, verificationExecutor, db, connReaderFactory,
|
||||
connWriterFactory, protoReaderFactory, protoWriterFactory,
|
||||
contactId, transport);
|
||||
super(dbExecutor, verificationExecutor, db, connRegistry,
|
||||
connReaderFactory, connWriterFactory, protoReaderFactory,
|
||||
protoWriterFactory, contactId, transportId, transport);
|
||||
this.transportIndex = transportIndex;
|
||||
}
|
||||
|
||||
|
||||
@@ -40,11 +40,13 @@ import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.RawBatch;
|
||||
import net.sf.briar.api.protocol.Request;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
import net.sf.briar.api.protocol.UnverifiedBatch;
|
||||
import net.sf.briar.api.protocol.VerificationExecutor;
|
||||
import net.sf.briar.api.transport.ConnectionReader;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
import net.sf.briar.api.transport.ConnectionWriter;
|
||||
import net.sf.briar.api.transport.ConnectionWriterFactory;
|
||||
import net.sf.briar.api.transport.StreamTransportConnection;
|
||||
@@ -59,11 +61,13 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
};
|
||||
|
||||
protected final DatabaseComponent db;
|
||||
protected final ConnectionRegistry connRegistry;
|
||||
protected final ConnectionReaderFactory connReaderFactory;
|
||||
protected final ConnectionWriterFactory connWriterFactory;
|
||||
protected final ProtocolReaderFactory protoReaderFactory;
|
||||
protected final ProtocolWriterFactory protoWriterFactory;
|
||||
protected final ContactId contactId;
|
||||
protected final TransportId transportId;
|
||||
protected final StreamTransportConnection transport;
|
||||
|
||||
private final Executor dbExecutor, verificationExecutor;
|
||||
@@ -76,19 +80,22 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
|
||||
StreamConnection(@DatabaseExecutor Executor dbExecutor,
|
||||
@VerificationExecutor Executor verificationExecutor,
|
||||
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
|
||||
DatabaseComponent db, ConnectionRegistry connRegistry,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
|
||||
StreamTransportConnection transport) {
|
||||
TransportId transportId, StreamTransportConnection transport) {
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.verificationExecutor = verificationExecutor;
|
||||
this.db = db;
|
||||
this.connRegistry = connRegistry;
|
||||
this.connReaderFactory = connReaderFactory;
|
||||
this.connWriterFactory = connWriterFactory;
|
||||
this.protoReaderFactory = protoReaderFactory;
|
||||
this.protoWriterFactory = protoWriterFactory;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.transport = transport;
|
||||
canSendOffer = new AtomicBoolean(false);
|
||||
disposed = new AtomicBoolean(false);
|
||||
@@ -188,8 +195,9 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
}
|
||||
|
||||
void write() {
|
||||
connRegistry.registerConnection(contactId, transportId);
|
||||
db.addListener(this);
|
||||
try {
|
||||
db.addListener(this);
|
||||
OutputStream out = createConnectionWriter().getOutputStream();
|
||||
writer = protoWriterFactory.createProtocolWriter(out,
|
||||
transport.shouldFlush());
|
||||
@@ -217,6 +225,7 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
|
||||
if(!disposed.getAndSet(true)) transport.dispose(true, true);
|
||||
} finally {
|
||||
connRegistry.unregisterConnection(contactId, transportId);
|
||||
db.removeListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,11 +7,13 @@ import net.sf.briar.api.db.DatabaseComponent;
|
||||
import net.sf.briar.api.db.DatabaseExecutor;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
import net.sf.briar.api.protocol.TransportIndex;
|
||||
import net.sf.briar.api.protocol.VerificationExecutor;
|
||||
import net.sf.briar.api.protocol.stream.StreamConnectionFactory;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
import net.sf.briar.api.transport.ConnectionWriterFactory;
|
||||
import net.sf.briar.api.transport.StreamTransportConnection;
|
||||
|
||||
@@ -21,6 +23,7 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
|
||||
|
||||
private final Executor dbExecutor, verificationExecutor;
|
||||
private final DatabaseComponent db;
|
||||
private final ConnectionRegistry connRegistry;
|
||||
private final ConnectionReaderFactory connReaderFactory;
|
||||
private final ConnectionWriterFactory connWriterFactory;
|
||||
private final ProtocolReaderFactory protoReaderFactory;
|
||||
@@ -29,24 +32,27 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
|
||||
@Inject
|
||||
StreamConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
|
||||
@VerificationExecutor Executor verificationExecutor,
|
||||
DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
|
||||
DatabaseComponent db, ConnectionRegistry connRegistry,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory) {
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.verificationExecutor = verificationExecutor;
|
||||
this.db = db;
|
||||
this.connRegistry = connRegistry;
|
||||
this.connReaderFactory = connReaderFactory;
|
||||
this.connWriterFactory = connWriterFactory;
|
||||
this.protoReaderFactory = protoReaderFactory;
|
||||
this.protoWriterFactory = protoWriterFactory;
|
||||
}
|
||||
|
||||
public void createIncomingConnection(ConnectionContext ctx,
|
||||
public void createIncomingConnection(ConnectionContext ctx, TransportId t,
|
||||
StreamTransportConnection s, byte[] tag) {
|
||||
final StreamConnection conn = new IncomingStreamConnection(dbExecutor,
|
||||
verificationExecutor, db, connReaderFactory, connWriterFactory,
|
||||
protoReaderFactory, protoWriterFactory, ctx, s, tag);
|
||||
verificationExecutor, db, connRegistry, connReaderFactory,
|
||||
connWriterFactory, protoReaderFactory, protoWriterFactory,
|
||||
ctx, t, s, tag);
|
||||
Runnable write = new Runnable() {
|
||||
public void run() {
|
||||
conn.write();
|
||||
@@ -61,11 +67,12 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
|
||||
new Thread(read).start();
|
||||
}
|
||||
|
||||
public void createOutgoingConnection(ContactId c, TransportIndex i,
|
||||
StreamTransportConnection s) {
|
||||
public void createOutgoingConnection(ContactId c, TransportId t,
|
||||
TransportIndex i, StreamTransportConnection s) {
|
||||
final StreamConnection conn = new OutgoingStreamConnection(dbExecutor,
|
||||
verificationExecutor, db, connReaderFactory, connWriterFactory,
|
||||
protoReaderFactory, protoWriterFactory, c, i, s);
|
||||
verificationExecutor, db, connRegistry, connReaderFactory,
|
||||
connWriterFactory, protoReaderFactory, protoWriterFactory,
|
||||
c, t, i, s);
|
||||
Runnable write = new Runnable() {
|
||||
public void run() {
|
||||
conn.write();
|
||||
|
||||
@@ -49,9 +49,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
executor.execute(new DispatchBatchConnection(t, r));
|
||||
}
|
||||
|
||||
public void dispatchWriter(ContactId c, TransportIndex i,
|
||||
public void dispatchWriter(ContactId c, TransportId t, TransportIndex i,
|
||||
BatchTransportWriter w) {
|
||||
batchConnFactory.createOutgoingConnection(c, i, w);
|
||||
batchConnFactory.createOutgoingConnection(c, t, i, w);
|
||||
}
|
||||
|
||||
public void dispatchIncomingConnection(TransportId t,
|
||||
@@ -59,9 +59,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
executor.execute(new DispatchStreamConnection(t, s));
|
||||
}
|
||||
|
||||
public void dispatchOutgoingConnection(ContactId c, TransportIndex i,
|
||||
StreamTransportConnection s) {
|
||||
streamConnFactory.createOutgoingConnection(c, i, s);
|
||||
public void dispatchOutgoingConnection(ContactId c, TransportId t,
|
||||
TransportIndex i, StreamTransportConnection s) {
|
||||
streamConnFactory.createOutgoingConnection(c, t, i, s);
|
||||
}
|
||||
|
||||
private byte[] readTag(InputStream in) throws IOException {
|
||||
@@ -92,8 +92,8 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
ConnectionContext ctx = recogniser.acceptConnection(transportId,
|
||||
tag);
|
||||
if(ctx == null) transport.dispose(false, false);
|
||||
else batchConnFactory.createIncomingConnection(ctx, transport,
|
||||
tag);
|
||||
else batchConnFactory.createIncomingConnection(ctx, transportId,
|
||||
transport, tag);
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
|
||||
transport.dispose(true, false);
|
||||
@@ -121,8 +121,8 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
|
||||
ConnectionContext ctx = recogniser.acceptConnection(transportId,
|
||||
tag);
|
||||
if(ctx == null) transport.dispose(false, false);
|
||||
else streamConnFactory.createIncomingConnection(ctx, transport,
|
||||
tag);
|
||||
else streamConnFactory.createIncomingConnection(ctx,
|
||||
transportId, transport, tag);
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
|
||||
transport.dispose(true, false);
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package net.sf.briar.transport;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import net.sf.briar.api.ContactId;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
|
||||
public class ConnectionRegistryImpl implements ConnectionRegistry {
|
||||
|
||||
// Locking: this
|
||||
private final Map<TransportId, Map<ContactId, Integer>> connections;
|
||||
|
||||
ConnectionRegistryImpl() {
|
||||
connections = new HashMap<TransportId, Map<ContactId, Integer>>();
|
||||
}
|
||||
|
||||
public synchronized void registerConnection(ContactId c, TransportId t) {
|
||||
Map<ContactId, Integer> m = connections.get(t);
|
||||
if(m == null) {
|
||||
m = new HashMap<ContactId, Integer>();
|
||||
connections.put(t, m);
|
||||
}
|
||||
Integer count = m.get(c);
|
||||
if(count == null) m.put(c, 1);
|
||||
else m.put(c, count + 1);
|
||||
}
|
||||
|
||||
public synchronized void unregisterConnection(ContactId c, TransportId t) {
|
||||
Map<ContactId, Integer> m = connections.get(t);
|
||||
if(m == null) throw new IllegalArgumentException();
|
||||
Integer count = m.remove(c);
|
||||
if(count == null) throw new IllegalArgumentException();
|
||||
if(count == 1) {
|
||||
if(m.isEmpty()) connections.remove(t);
|
||||
} else {
|
||||
m.put(c, count - 1);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Collection<ContactId> getConnectedContacts(
|
||||
TransportId t) {
|
||||
Map<ContactId, Integer> m = connections.get(t);
|
||||
if(m == null) return Collections.emptyList();
|
||||
List<ContactId> keys = new ArrayList<ContactId>(m.keySet());
|
||||
return Collections.unmodifiableList(keys);
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import net.sf.briar.api.transport.ConnectionDispatcher;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
import net.sf.briar.api.transport.ConnectionRecogniser;
|
||||
import net.sf.briar.api.transport.ConnectionRecogniserExecutor;
|
||||
import net.sf.briar.api.transport.ConnectionRegistry;
|
||||
import net.sf.briar.api.transport.ConnectionWindowFactory;
|
||||
import net.sf.briar.api.transport.ConnectionWriterFactory;
|
||||
|
||||
@@ -23,6 +24,7 @@ public class TransportModule extends AbstractModule {
|
||||
bind(ConnectionReaderFactory.class).to(
|
||||
ConnectionReaderFactoryImpl.class);
|
||||
bind(ConnectionRecogniser.class).to(ConnectionRecogniserImpl.class);
|
||||
bind(ConnectionRegistry.class).toInstance(new ConnectionRegistryImpl());
|
||||
bind(ConnectionWindowFactory.class).to(
|
||||
ConnectionWindowFactoryImpl.class);
|
||||
bind(ConnectionWriterFactory.class).to(
|
||||
|
||||
Reference in New Issue
Block a user