Moved ConnectionDispatcher and ConnectionRegistry to plugins package.

This commit is contained in:
akwizgran
2014-11-05 19:37:13 +00:00
parent 26d93b83b4
commit 4ca83842d1
21 changed files with 36 additions and 69 deletions

View File

@@ -0,0 +1,418 @@
package org.briarproject.plugins;
import static java.util.logging.Level.WARNING;
import static org.briarproject.api.transport.TransportConstants.TAG_LENGTH;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.crypto.KeyManager;
import org.briarproject.api.db.DbException;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.MessagingSessionFactory;
import org.briarproject.api.plugins.ConnectionDispatcher;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.TransportConnectionReader;
import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.transport.StreamContext;
import org.briarproject.api.transport.StreamReader;
import org.briarproject.api.transport.StreamReaderFactory;
import org.briarproject.api.transport.StreamWriter;
import org.briarproject.api.transport.StreamWriterFactory;
import org.briarproject.api.transport.TagRecogniser;
class ConnectionDispatcherImpl implements ConnectionDispatcher {
private static final Logger LOG =
Logger.getLogger(ConnectionDispatcherImpl.class.getName());
private final Executor ioExecutor;
private final KeyManager keyManager;
private final TagRecogniser tagRecogniser;
private final StreamReaderFactory streamReaderFactory;
private final StreamWriterFactory streamWriterFactory;
private final MessagingSessionFactory messagingSessionFactory;
private final ConnectionRegistry connectionRegistry;
@Inject
ConnectionDispatcherImpl(@IoExecutor Executor ioExecutor,
KeyManager keyManager, TagRecogniser tagRecogniser,
StreamReaderFactory streamReaderFactory,
StreamWriterFactory streamWriterFactory,
MessagingSessionFactory messagingSessionFactory,
ConnectionRegistry connectionRegistry) {
this.ioExecutor = ioExecutor;
this.keyManager = keyManager;
this.tagRecogniser = tagRecogniser;
this.streamReaderFactory = streamReaderFactory;
this.streamWriterFactory = streamWriterFactory;
this.messagingSessionFactory = messagingSessionFactory;
this.connectionRegistry = connectionRegistry;
}
public void dispatchIncomingConnection(TransportId t,
TransportConnectionReader r) {
ioExecutor.execute(new DispatchIncomingSimplexConnection(t, r));
}
public void dispatchIncomingConnection(TransportId t,
DuplexTransportConnection d) {
ioExecutor.execute(new DispatchIncomingDuplexConnection(t, d));
}
public void dispatchOutgoingConnection(ContactId c, TransportId t,
TransportConnectionWriter w) {
ioExecutor.execute(new DispatchOutgoingSimplexConnection(c, t, w));
}
public void dispatchOutgoingConnection(ContactId c, TransportId t,
DuplexTransportConnection d) {
ioExecutor.execute(new DispatchOutgoingDuplexConnection(c, t, d));
}
private byte[] readTag(TransportId t, TransportConnectionReader r)
throws IOException {
// Read the tag
byte[] tag = new byte[TAG_LENGTH];
InputStream in = r.getInputStream();
int offset = 0;
while(offset < tag.length) {
int read = in.read(tag, offset, tag.length - offset);
if(read == -1) throw new EOFException();
offset += read;
}
return tag;
}
private MessagingSession createIncomingSession(StreamContext ctx,
TransportConnectionReader r) throws IOException {
InputStream in = r.getInputStream();
StreamReader streamReader = streamReaderFactory.createStreamReader(in,
r.getMaxFrameLength(), ctx);
return messagingSessionFactory.createIncomingSession(ctx.getContactId(),
streamReader.getInputStream());
}
private MessagingSession createOutgoingSession(StreamContext ctx,
TransportConnectionWriter w, boolean duplex) throws IOException {
OutputStream out = w.getOutputStream();
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out,
w.getMaxFrameLength(), ctx);
return messagingSessionFactory.createOutgoingSession(ctx.getContactId(),
ctx.getTransportId(), w.getMaxLatency(),
streamWriter.getOutputStream(), duplex);
}
private class DispatchIncomingSimplexConnection implements Runnable {
private final TransportId transportId;
private final TransportConnectionReader reader;
private DispatchIncomingSimplexConnection(TransportId transportId,
TransportConnectionReader reader) {
this.transportId = transportId;
this.reader = reader;
}
public void run() {
// Read and recognise the tag
StreamContext ctx;
try {
byte[] tag = readTag(transportId, reader);
ctx = tagRecogniser.recogniseTag(transportId, tag);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
return;
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
return;
}
if(ctx == null) {
LOG.info("Unrecognised tag");
disposeReader(true, false);
return;
}
ContactId contactId = ctx.getContactId();
connectionRegistry.registerConnection(contactId, transportId);
try {
// Create and run the incoming session
createIncomingSession(ctx, reader).run();
disposeReader(false, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
}
private void disposeReader(boolean exception, boolean recognised) {
try {
reader.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class DispatchOutgoingSimplexConnection implements Runnable {
private final ContactId contactId;
private final TransportId transportId;
private final TransportConnectionWriter writer;
private DispatchOutgoingSimplexConnection(ContactId contactId,
TransportId transportId, TransportConnectionWriter writer) {
this.contactId = contactId;
this.transportId = transportId;
this.writer = writer;
}
public void run() {
// Allocate a stream context
StreamContext ctx = keyManager.getStreamContext(contactId,
transportId);
if(ctx == null) {
LOG.warning("Could not allocate stream context");
disposeWriter(true);
return;
}
connectionRegistry.registerConnection(contactId, transportId);
try {
// Create and run the outgoing session
createOutgoingSession(ctx, writer, false).run();
disposeWriter(false);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
}
private void disposeWriter(boolean exception) {
try {
writer.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class DispatchIncomingDuplexConnection implements Runnable {
private final TransportId transportId;
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private volatile ContactId contactId = null;
private volatile MessagingSession incomingSession = null;
private volatile MessagingSession outgoingSession = null;
private DispatchIncomingDuplexConnection(TransportId transportId,
DuplexTransportConnection transport) {
this.transportId = transportId;
reader = transport.getReader();
writer = transport.getWriter();
}
public void run() {
// Read and recognise the tag
StreamContext ctx;
try {
byte[] tag = readTag(transportId, reader);
ctx = tagRecogniser.recogniseTag(transportId, tag);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
return;
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, false);
return;
}
if(ctx == null) {
LOG.info("Unrecognised tag");
disposeReader(true, false);
return;
}
contactId = ctx.getContactId();
connectionRegistry.registerConnection(contactId, transportId);
// Start the outgoing session on another thread
ioExecutor.execute(new Runnable() {
public void run() {
runOutgoingSession();
}
});
try {
// Create and run the incoming session
incomingSession = createIncomingSession(ctx, reader);
incomingSession.run();
disposeReader(false, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
}
private void runOutgoingSession() {
// Allocate a stream context
StreamContext ctx = keyManager.getStreamContext(contactId,
transportId);
if(ctx == null) {
LOG.warning("Could not allocate stream context");
disposeWriter(true);
return;
}
try {
// Create and run the outgoing session
outgoingSession = createOutgoingSession(ctx, writer, true);
outgoingSession.run();
disposeWriter(false);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
}
}
private void disposeReader(boolean exception, boolean recognised) {
if(exception && outgoingSession != null)
outgoingSession.interrupt();
try {
reader.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void disposeWriter(boolean exception) {
if(exception && incomingSession != null)
incomingSession.interrupt();
try {
writer.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class DispatchOutgoingDuplexConnection implements Runnable {
private final ContactId contactId;
private final TransportId transportId;
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private volatile MessagingSession incomingSession = null;
private volatile MessagingSession outgoingSession = null;
private DispatchOutgoingDuplexConnection(ContactId contactId,
TransportId transportId, DuplexTransportConnection transport) {
this.contactId = contactId;
this.transportId = transportId;
reader = transport.getReader();
writer = transport.getWriter();
}
public void run() {
// Allocate a stream context
StreamContext ctx = keyManager.getStreamContext(contactId,
transportId);
if(ctx == null) {
LOG.warning("Could not allocate stream context");
disposeWriter(true);
return;
}
connectionRegistry.registerConnection(contactId, transportId);
// Start the incoming session on another thread
ioExecutor.execute(new Runnable() {
public void run() {
runIncomingSession();
}
});
try {
// Create and run the outgoing session
outgoingSession = createOutgoingSession(ctx, writer, true);
outgoingSession.run();
disposeWriter(false);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId);
}
}
private void runIncomingSession() {
// Read and recognise the tag
StreamContext ctx;
try {
byte[] tag = readTag(transportId, reader);
ctx = tagRecogniser.recogniseTag(transportId, tag);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
return;
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
return;
}
// Unrecognised tags are suspicious in this case
if(ctx == null) {
LOG.warning("Unrecognised tag for returning stream");
disposeReader(true, true);
return;
}
// Check that the stream comes from the expected contact
if(!ctx.getContactId().equals(contactId)) {
LOG.warning("Wrong contact ID for returning stream");
disposeReader(true, true);
return;
}
try {
// Create and run the incoming session
incomingSession = createIncomingSession(ctx, reader);
incomingSession.run();
disposeReader(false, true);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true);
}
}
private void disposeReader(boolean exception, boolean recognised) {
if(exception && outgoingSession != null)
outgoingSession.interrupt();
try {
reader.dispose(exception, recognised);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
private void disposeWriter(boolean exception) {
if(exception && incomingSession != null)
incomingSession.interrupt();
try {
writer.dispose(exception);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
}

View File

@@ -0,0 +1,106 @@
package org.briarproject.plugins;
import static java.util.logging.Level.INFO;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId;
import org.briarproject.api.event.ContactConnectedEvent;
import org.briarproject.api.event.ContactDisconnectedEvent;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.plugins.ConnectionRegistry;
import com.google.inject.Inject;
class ConnectionRegistryImpl implements ConnectionRegistry {
private static final Logger LOG =
Logger.getLogger(ConnectionRegistryImpl.class.getName());
private final EventBus eventBus;
// Locking: this
private final Map<TransportId, Map<ContactId, Integer>> connections;
// Locking: this
private final Map<ContactId, Integer> contactCounts;
@Inject
ConnectionRegistryImpl(EventBus eventBus) {
this.eventBus = eventBus;
connections = new HashMap<TransportId, Map<ContactId, Integer>>();
contactCounts = new HashMap<ContactId, Integer>();
}
public void registerConnection(ContactId c, TransportId t) {
LOG.info("Connection registered");
boolean firstConnection = false;
synchronized(this) {
Map<ContactId, Integer> m = connections.get(t);
if(m == null) {
m = new HashMap<ContactId, Integer>();
connections.put(t, m);
}
Integer count = m.get(c);
if(count == null) m.put(c, 1);
else m.put(c, count + 1);
count = contactCounts.get(c);
if(count == null) {
firstConnection = true;
contactCounts.put(c, 1);
} else {
contactCounts.put(c, count + 1);
}
}
if(firstConnection) {
LOG.info("Contact connected");
eventBus.broadcast(new ContactConnectedEvent(c));
}
}
public void unregisterConnection(ContactId c, TransportId t) {
LOG.info("Connection unregistered");
boolean lastConnection = false;
synchronized(this) {
Map<ContactId, Integer> m = connections.get(t);
if(m == null) throw new IllegalArgumentException();
Integer count = m.remove(c);
if(count == null) throw new IllegalArgumentException();
if(count == 1) {
if(m.isEmpty()) connections.remove(t);
} else {
m.put(c, count - 1);
}
count = contactCounts.get(c);
if(count == null) throw new IllegalArgumentException();
if(count == 1) {
lastConnection = true;
contactCounts.remove(c);
} else {
contactCounts.put(c, count - 1);
}
}
if(lastConnection) {
LOG.info("Contact disconnected");
eventBus.broadcast(new ContactDisconnectedEvent(c));
}
}
public synchronized Collection<ContactId> getConnectedContacts(
TransportId t) {
Map<ContactId, Integer> m = connections.get(t);
if(m == null) return Collections.emptyList();
List<ContactId> ids = new ArrayList<ContactId>(m.keySet());
if(LOG.isLoggable(INFO)) LOG.info(ids.size() + " contacts connected");
return Collections.unmodifiableList(ids);
}
public synchronized boolean isConnected(ContactId c) {
return contactCounts.containsKey(c);
}
}

View File

@@ -24,6 +24,7 @@ import org.briarproject.api.TransportProperties;
import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.ConnectionDispatcher;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.plugins.PluginCallback;
import org.briarproject.api.plugins.PluginManager;
@@ -39,7 +40,6 @@ import org.briarproject.api.plugins.simplex.SimplexPluginCallback;
import org.briarproject.api.plugins.simplex.SimplexPluginConfig;
import org.briarproject.api.plugins.simplex.SimplexPluginFactory;
import org.briarproject.api.system.Clock;
import org.briarproject.api.transport.ConnectionDispatcher;
import org.briarproject.api.ui.UiCallback;
// FIXME: Don't make alien calls with a lock held (that includes waiting on a

View File

@@ -3,6 +3,8 @@ package org.briarproject.plugins;
import javax.inject.Singleton;
import org.briarproject.api.lifecycle.LifecycleManager;
import org.briarproject.api.plugins.ConnectionDispatcher;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.PluginManager;
import com.google.inject.AbstractModule;
@@ -13,6 +15,9 @@ public class PluginsModule extends AbstractModule {
@Override
protected void configure() {
bind(Poller.class).to(PollerImpl.class);
bind(ConnectionDispatcher.class).to(ConnectionDispatcherImpl.class);
bind(ConnectionRegistry.class).to(
ConnectionRegistryImpl.class).in(Singleton.class);
}
@Provides @Singleton

View File

@@ -10,9 +10,9 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import org.briarproject.api.lifecycle.IoExecutor;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.Plugin;
import org.briarproject.api.system.Timer;
import org.briarproject.api.transport.ConnectionRegistry;
class PollerImpl implements Poller {