Try to reconnect on connection loss. #262

This commit is contained in:
akwizgran
2016-05-04 11:30:41 +01:00
parent 5f5ceedc29
commit dd9bc74262
7 changed files with 134 additions and 29 deletions

View File

@@ -0,0 +1,30 @@
package org.briarproject.api.event;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
public class ConnectionClosedEvent extends Event {
private final ContactId contactId;
private final TransportId transportId;
private final boolean incoming;
public ConnectionClosedEvent(ContactId contactId, TransportId transportId,
boolean incoming) {
this.contactId = contactId;
this.transportId = transportId;
this.incoming = incoming;
}
public ContactId getContactId() {
return contactId;
}
public TransportId getTransportId() {
return transportId;
}
public boolean isIncoming() {
return incoming;
}
}

View File

@@ -0,0 +1,30 @@
package org.briarproject.api.event;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
public class ConnectionOpenedEvent extends Event {
private final ContactId contactId;
private final TransportId transportId;
private final boolean incoming;
public ConnectionOpenedEvent(ContactId contactId, TransportId transportId,
boolean incoming) {
this.contactId = contactId;
this.transportId = transportId;
this.incoming = incoming;
}
public ContactId getContactId() {
return contactId;
}
public TransportId getTransportId() {
return transportId;
}
public boolean isIncoming() {
return incoming;
}
}

View File

@@ -10,9 +10,9 @@ import java.util.Collection;
*/ */
public interface ConnectionRegistry { public interface ConnectionRegistry {
void registerConnection(ContactId c, TransportId t); void registerConnection(ContactId c, TransportId t, boolean incoming);
void unregisterConnection(ContactId c, TransportId t); void unregisterConnection(ContactId c, TransportId t, boolean incoming);
Collection<ContactId> getConnectedContacts(TransportId t); Collection<ContactId> getConnectedContacts(TransportId t);

View File

@@ -144,7 +144,7 @@ class ConnectionManagerImpl implements ConnectionManager {
return; return;
} }
ContactId contactId = ctx.getContactId(); ContactId contactId = ctx.getContactId();
connectionRegistry.registerConnection(contactId, transportId); connectionRegistry.registerConnection(contactId, transportId, true);
try { try {
// Create and run the incoming session // Create and run the incoming session
createIncomingSession(ctx, reader).run(); createIncomingSession(ctx, reader).run();
@@ -153,7 +153,8 @@ class ConnectionManagerImpl implements ConnectionManager {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true); disposeReader(true, true);
} finally { } finally {
connectionRegistry.unregisterConnection(contactId, transportId); connectionRegistry.unregisterConnection(contactId, transportId,
true);
} }
} }
@@ -194,7 +195,8 @@ class ConnectionManagerImpl implements ConnectionManager {
disposeWriter(true); disposeWriter(true);
return; return;
} }
connectionRegistry.registerConnection(contactId, transportId); connectionRegistry.registerConnection(contactId, transportId,
false);
try { try {
// Create and run the outgoing session // Create and run the outgoing session
createSimplexOutgoingSession(ctx, writer).run(); createSimplexOutgoingSession(ctx, writer).run();
@@ -203,7 +205,8 @@ class ConnectionManagerImpl implements ConnectionManager {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeWriter(true); disposeWriter(true);
} finally { } finally {
connectionRegistry.unregisterConnection(contactId, transportId); connectionRegistry.unregisterConnection(contactId, transportId,
false);
} }
} }
@@ -254,7 +257,7 @@ class ConnectionManagerImpl implements ConnectionManager {
return; return;
} }
contactId = ctx.getContactId(); contactId = ctx.getContactId();
connectionRegistry.registerConnection(contactId, transportId); connectionRegistry.registerConnection(contactId, transportId, true);
// Start the outgoing session on another thread // Start the outgoing session on another thread
ioExecutor.execute(new Runnable() { ioExecutor.execute(new Runnable() {
public void run() { public void run() {
@@ -270,7 +273,8 @@ class ConnectionManagerImpl implements ConnectionManager {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true); disposeReader(true, true);
} finally { } finally {
connectionRegistry.unregisterConnection(contactId, transportId); connectionRegistry.unregisterConnection(contactId, transportId,
true);
} }
} }
@@ -398,7 +402,8 @@ class ConnectionManagerImpl implements ConnectionManager {
disposeReader(true, true); disposeReader(true, true);
return; return;
} }
connectionRegistry.registerConnection(contactId, transportId); connectionRegistry.registerConnection(contactId, transportId,
false);
try { try {
// Create and run the incoming session // Create and run the incoming session
incomingSession = createIncomingSession(ctx, reader); incomingSession = createIncomingSession(ctx, reader);
@@ -408,7 +413,8 @@ class ConnectionManagerImpl implements ConnectionManager {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
disposeReader(true, true); disposeReader(true, true);
} finally { } finally {
connectionRegistry.unregisterConnection(contactId, transportId); connectionRegistry.unregisterConnection(contactId, transportId,
false);
} }
} }

View File

@@ -2,6 +2,8 @@ package org.briarproject.plugins;
import org.briarproject.api.TransportId; import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId; import org.briarproject.api.contact.ContactId;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ConnectionOpenedEvent;
import org.briarproject.api.event.ContactConnectedEvent; import org.briarproject.api.event.ContactConnectedEvent;
import org.briarproject.api.event.ContactDisconnectedEvent; import org.briarproject.api.event.ContactDisconnectedEvent;
import org.briarproject.api.event.EventBus; import org.briarproject.api.event.EventBus;
@@ -40,8 +42,12 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
contactCounts = new HashMap<ContactId, Integer>(); contactCounts = new HashMap<ContactId, Integer>();
} }
public void registerConnection(ContactId c, TransportId t) { public void registerConnection(ContactId c, TransportId t,
if (LOG.isLoggable(INFO)) LOG.info("Connection registered: " + t); boolean incoming) {
if (LOG.isLoggable(INFO)) {
if (incoming) LOG.info("Incoming connection registered: " + t);
else LOG.info("Outgoing connection registered: " + t);
}
boolean firstConnection = false; boolean firstConnection = false;
lock.lock(); lock.lock();
try { try {
@@ -63,14 +69,19 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
} finally { } finally {
lock.unlock(); lock.unlock();
} }
eventBus.broadcast(new ConnectionOpenedEvent(c, t, incoming));
if (firstConnection) { if (firstConnection) {
LOG.info("Contact connected"); LOG.info("Contact connected");
eventBus.broadcast(new ContactConnectedEvent(c)); eventBus.broadcast(new ContactConnectedEvent(c));
} }
} }
public void unregisterConnection(ContactId c, TransportId t) { public void unregisterConnection(ContactId c, TransportId t,
if (LOG.isLoggable(INFO)) LOG.info("Connection unregistered: " + t); boolean incoming) {
if (LOG.isLoggable(INFO)) {
if (incoming) LOG.info("Incoming connection unregistered: " + t);
else LOG.info("Outgoing connection unregistered: " + t);
}
boolean lastConnection = false; boolean lastConnection = false;
lock.lock(); lock.lock();
try { try {
@@ -94,6 +105,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
} finally { } finally {
lock.unlock(); lock.unlock();
} }
eventBus.broadcast(new ConnectionClosedEvent(c, t, incoming));
if (lastConnection) { if (lastConnection) {
LOG.info("Contact disconnected"); LOG.info("Contact disconnected");
eventBus.broadcast(new ContactDisconnectedEvent(c)); eventBus.broadcast(new ContactDisconnectedEvent(c));

View File

@@ -3,6 +3,7 @@ package org.briarproject.plugins;
import org.briarproject.api.TransportId; import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId; import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DbException; import org.briarproject.api.db.DbException;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ContactStatusChangedEvent; import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.Event; import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus; import org.briarproject.api.event.EventBus;
@@ -163,7 +164,16 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
public void eventOccurred(Event e) { public void eventOccurred(Event e) {
if (e instanceof ContactStatusChangedEvent) { if (e instanceof ContactStatusChangedEvent) {
ContactStatusChangedEvent c = (ContactStatusChangedEvent) e; ContactStatusChangedEvent c = (ContactStatusChangedEvent) e;
if (c.isActive()) connectToContact(c.getContactId()); if (c.isActive()) {
// Connect to the newly activated contact
connectToContact(c.getContactId());
}
} else if (e instanceof ConnectionClosedEvent) {
ConnectionClosedEvent c = (ConnectionClosedEvent) e;
if (!c.isIncoming()) {
// Connect to the disconnected contact
connectToContact(c.getContactId(), c.getTransportId());
}
} }
} }
@@ -174,6 +184,14 @@ class PluginManagerImpl implements PluginManager, Service, EventListener {
if (d.shouldPoll()) connectToContact(c, d); if (d.shouldPoll()) connectToContact(c, d);
} }
private void connectToContact(ContactId c, TransportId t) {
Plugin p = plugins.get(t);
if (p instanceof SimplexPlugin && p.shouldPoll())
connectToContact(c, (SimplexPlugin) p);
else if (p instanceof DuplexPlugin && p.shouldPoll())
connectToContact(c, (DuplexPlugin) p);
}
private void connectToContact(final ContactId c, final SimplexPlugin p) { private void connectToContact(final ContactId c, final SimplexPlugin p) {
ioExecutor.execute(new Runnable() { ioExecutor.execute(new Runnable() {
public void run() { public void run() {

View File

@@ -3,6 +3,8 @@ package org.briarproject.plugins;
import org.briarproject.BriarTestCase; import org.briarproject.BriarTestCase;
import org.briarproject.api.TransportId; import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId; import org.briarproject.api.contact.ContactId;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ConnectionOpenedEvent;
import org.briarproject.api.event.ContactConnectedEvent; import org.briarproject.api.event.ContactConnectedEvent;
import org.briarproject.api.event.ContactDisconnectedEvent; import org.briarproject.api.event.ContactDisconnectedEvent;
import org.briarproject.api.event.EventBus; import org.briarproject.api.event.EventBus;
@@ -35,6 +37,10 @@ public class ConnectionRegistryImplTest extends BriarTestCase {
Mockery context = new Mockery(); Mockery context = new Mockery();
final EventBus eventBus = context.mock(EventBus.class); final EventBus eventBus = context.mock(EventBus.class);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
exactly(5).of(eventBus).broadcast(with(any(
ConnectionOpenedEvent.class)));
exactly(2).of(eventBus).broadcast(with(any(
ConnectionClosedEvent.class)));
exactly(3).of(eventBus).broadcast(with(any( exactly(3).of(eventBus).broadcast(with(any(
ContactConnectedEvent.class))); ContactConnectedEvent.class)));
oneOf(eventBus).broadcast(with(any( oneOf(eventBus).broadcast(with(any(
@@ -49,43 +55,46 @@ public class ConnectionRegistryImplTest extends BriarTestCase {
assertEquals(Collections.emptyList(), assertEquals(Collections.emptyList(),
c.getConnectedContacts(transportId1)); c.getConnectedContacts(transportId1));
// Check that a registered connection shows up - this should // Check that a registered connection shows up - this should
// broadcast a ContactConnectedEvent // broadcast a ConnectionOpenedEvent and a ContactConnectedEvent
c.registerConnection(contactId, transportId); c.registerConnection(contactId, transportId, true);
assertEquals(Collections.singletonList(contactId), assertEquals(Collections.singletonList(contactId),
c.getConnectedContacts(transportId)); c.getConnectedContacts(transportId));
assertEquals(Collections.emptyList(), assertEquals(Collections.emptyList(),
c.getConnectedContacts(transportId1)); c.getConnectedContacts(transportId1));
// Register an identical connection - lookup should be unaffected // Register an identical connection - this should broadcast a
c.registerConnection(contactId, transportId); // ConnectionOpenedEvent and lookup should be unaffected
c.registerConnection(contactId, transportId, true);
assertEquals(Collections.singletonList(contactId), assertEquals(Collections.singletonList(contactId),
c.getConnectedContacts(transportId)); c.getConnectedContacts(transportId));
assertEquals(Collections.emptyList(), assertEquals(Collections.emptyList(),
c.getConnectedContacts(transportId1)); c.getConnectedContacts(transportId1));
// Unregister one of the connections - lookup should be unaffected // Unregister one of the connections - this should broadcast a
c.unregisterConnection(contactId, transportId); // ConnectionClosedEvent and lookup should be unaffected
c.unregisterConnection(contactId, transportId, true);
assertEquals(Collections.singletonList(contactId), assertEquals(Collections.singletonList(contactId),
c.getConnectedContacts(transportId)); c.getConnectedContacts(transportId));
assertEquals(Collections.emptyList(), assertEquals(Collections.emptyList(),
c.getConnectedContacts(transportId1)); c.getConnectedContacts(transportId1));
// Unregister the other connection - lookup should be affected - // Unregister the other connection - this should broadcast a
// this should broadcast a ContactDisconnectedEvent // ConnectionClosedEvent and a ContactDisconnectedEvent
c.unregisterConnection(contactId, transportId); c.unregisterConnection(contactId, transportId, true);
assertEquals(Collections.emptyList(), assertEquals(Collections.emptyList(),
c.getConnectedContacts(transportId)); c.getConnectedContacts(transportId));
assertEquals(Collections.emptyList(), assertEquals(Collections.emptyList(),
c.getConnectedContacts(transportId1)); c.getConnectedContacts(transportId1));
// Try to unregister the connection again - exception should be thrown // Try to unregister the connection again - exception should be thrown
try { try {
c.unregisterConnection(contactId, transportId); c.unregisterConnection(contactId, transportId, true);
fail(); fail();
} catch (IllegalArgumentException expected) { } catch (IllegalArgumentException expected) {
// Expected // Expected
} }
// Register both contacts with one transport, one contact with both - // Register both contacts with one transport, one contact with both -
// this should broadcast two ContactConnectedEvents // this should broadcast three ConnectionOpenedEvents and two
c.registerConnection(contactId, transportId); // ContactConnectedEvents
c.registerConnection(contactId1, transportId); c.registerConnection(contactId, transportId, true);
c.registerConnection(contactId1, transportId1); c.registerConnection(contactId1, transportId, true);
c.registerConnection(contactId1, transportId1, true);
Collection<ContactId> connected = c.getConnectedContacts(transportId); Collection<ContactId> connected = c.getConnectedContacts(transportId);
assertEquals(2, connected.size()); assertEquals(2, connected.size());
assertTrue(connected.contains(contactId)); assertTrue(connected.contains(contactId));