Don't try to reconnect if the connection was closed cleanly.

This commit is contained in:
akwizgran
2020-05-25 17:15:00 +01:00
parent 35d1b406f7
commit 6eb77465f6
8 changed files with 116 additions and 55 deletions

View File

@@ -47,7 +47,7 @@ public interface ConnectionRegistry {
* the contact. * the contact.
*/ */
void unregisterConnection(ContactId c, TransportId t, void unregisterConnection(ContactId c, TransportId t,
InterruptibleConnection conn, boolean incoming); InterruptibleConnection conn, boolean incoming, boolean exception);
/** /**
* Sets the {@link Priority priority} of a connection that was previously * Sets the {@link Priority priority} of a connection that was previously

View File

@@ -13,13 +13,14 @@ public class ConnectionClosedEvent extends Event {
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
private final boolean incoming; private final boolean incoming, exception;
public ConnectionClosedEvent(ContactId contactId, TransportId transportId, public ConnectionClosedEvent(ContactId contactId, TransportId transportId,
boolean incoming) { boolean incoming, boolean exception) {
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId; this.transportId = transportId;
this.incoming = incoming; this.incoming = incoming;
this.exception = exception;
} }
public ContactId getContactId() { public ContactId getContactId() {
@@ -33,4 +34,8 @@ public class ConnectionClosedEvent extends Event {
public boolean isIncoming() { public boolean isIncoming() {
return incoming; return incoming;
} }
public boolean isException() {
return exception;
}
} }

View File

@@ -136,7 +136,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
@Override @Override
public void unregisterConnection(ContactId c, TransportId t, public void unregisterConnection(ContactId c, TransportId t,
InterruptibleConnection conn, boolean incoming) { InterruptibleConnection conn, boolean incoming, boolean exception) {
if (LOG.isLoggable(INFO)) { if (LOG.isLoggable(INFO)) {
if (incoming) LOG.info("Incoming connection unregistered: " + t); if (incoming) LOG.info("Incoming connection unregistered: " + t);
else LOG.info("Outgoing connection unregistered: " + t); else LOG.info("Outgoing connection unregistered: " + t);
@@ -148,7 +148,8 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
lastConnection = recs.isEmpty(); lastConnection = recs.isEmpty();
} }
eventBus.broadcast(new ConnectionClosedEvent(c, t, incoming)); eventBus.broadcast(
new ConnectionClosedEvent(c, t, incoming, exception));
if (lastConnection) { if (lastConnection) {
LOG.info("Contact disconnected"); LOG.info("Contact disconnected");
eventBus.broadcast(new ContactDisconnectedEvent(c)); eventBus.broadcast(new ContactDisconnectedEvent(c));
@@ -178,8 +179,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
@Override @Override
public Collection<ContactId> getConnectedOrBetterContacts(TransportId t) { public Collection<ContactId> getConnectedOrBetterContacts(TransportId t) {
synchronized (lock) { synchronized (lock) {
List<TransportId> better = betterTransports.get(t); List<TransportId> better = getBetterTransports(t);
if (better == null) better = emptyList();
List<ContactId> contactIds = new ArrayList<>(); List<ContactId> contactIds = new ArrayList<>();
for (Entry<ContactId, List<ConnectionRecord>> e : for (Entry<ContactId, List<ConnectionRecord>> e :
contactConnections.entrySet()) { contactConnections.entrySet()) {

View File

@@ -74,12 +74,13 @@ class IncomingDuplexSyncConnection extends DuplexSyncConnection
createIncomingSession(ctx, reader, handler).run(); createIncomingSession(ctx, reader, handler).run();
reader.dispose(false, true); reader.dispose(false, true);
interruptOutgoingSession(); interruptOutgoingSession();
connectionRegistry.unregisterConnection(contactId, transportId,
this, true, false);
} catch (DbException | IOException e) { } catch (DbException | IOException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onReadError(true); onReadError(true);
} finally {
connectionRegistry.unregisterConnection(contactId, transportId, connectionRegistry.unregisterConnection(contactId, transportId,
this, true); this, true, true);
} }
} }

View File

@@ -118,12 +118,13 @@ class OutgoingDuplexSyncConnection extends DuplexSyncConnection
createIncomingSession(ctx, reader, handler).run(); createIncomingSession(ctx, reader, handler).run();
reader.dispose(false, true); reader.dispose(false, true);
interruptOutgoingSession(); interruptOutgoingSession();
connectionRegistry.unregisterConnection(contactId, transportId,
this, false, false);
} catch (DbException | IOException e) { } catch (DbException | IOException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
onReadError(); onReadError();
} finally {
connectionRegistry.unregisterConnection(contactId, transportId, connectionRegistry.unregisterConnection(contactId, transportId,
this, false); this, false, true);
} }
} }

View File

@@ -98,8 +98,8 @@ class PollerImpl implements Poller, EventListener {
ConnectionClosedEvent c = (ConnectionClosedEvent) e; ConnectionClosedEvent c = (ConnectionClosedEvent) e;
// Reschedule polling, the polling interval may have decreased // Reschedule polling, the polling interval may have decreased
reschedule(c.getTransportId()); reschedule(c.getTransportId());
if (!c.isIncoming()) { // If an outgoing connection failed, try to reconnect
// Connect to the disconnected contact if (!c.isIncoming() && c.isException()) {
connectToContact(c.getContactId(), c.getTransportId()); connectToContact(c.getContactId(), c.getTransportId());
} }
} else if (e instanceof ConnectionOpenedEvent) { } else if (e instanceof ConnectionOpenedEvent) {

View File

@@ -108,7 +108,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class)));
}}); }});
c.unregisterConnection(contactId1, transportId1, conn1, true); c.unregisterConnection(contactId1, transportId1, conn1, true, false);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -123,7 +123,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any( oneOf(eventBus).broadcast(with(any(
ContactDisconnectedEvent.class))); ContactDisconnectedEvent.class)));
}}); }});
c.unregisterConnection(contactId1, transportId1, conn2, true); c.unregisterConnection(contactId1, transportId1, conn2, true, false);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(emptyList(), c.getConnectedContacts(transportId1)); assertEquals(emptyList(), c.getConnectedContacts(transportId1));
@@ -131,7 +131,8 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
// Try to unregister the connection again - exception should be thrown // Try to unregister the connection again - exception should be thrown
try { try {
c.unregisterConnection(contactId1, transportId1, conn2, true); c.unregisterConnection(contactId1, transportId1, conn2,
true, false);
fail(); fail();
} catch (IllegalArgumentException expected) { } catch (IllegalArgumentException expected) {
// Expected // Expected
@@ -332,7 +333,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class)));
}}); }});
c.unregisterConnection(contactId1, transportId2, conn2, true); c.unregisterConnection(contactId1, transportId2, conn2, true, false);
context.assertIsSatisfied(); context.assertIsSatisfied();
assertEquals(singletonList(contactId1), assertEquals(singletonList(contactId1),
@@ -429,7 +430,7 @@ public class ConnectionRegistryImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class))); oneOf(eventBus).broadcast(with(any(ConnectionClosedEvent.class)));
}}); }});
c.unregisterConnection(contactId1, transportId1, conn1, true); c.unregisterConnection(contactId1, transportId1, conn1, true, false);
context.assertIsSatisfied(); context.assertIsSatisfied();
// The contact is not connected via transport 1 but is connected via a // The contact is not connected via transport 1 but is connected via a

View File

@@ -157,7 +157,21 @@ public class PollerImplTest extends BrambleMockTestCase {
} }
@Test @Test
public void testRescheduleAndReconnectOnConnectionClosed() public void testRescheduleOnOutgoingConnectionClosed() {
DuplexPlugin plugin = context.mock(DuplexPlugin.class);
context.checking(new Expectations() {{
allowing(plugin).getId();
will(returnValue(transportId));
}});
expectReschedule(plugin);
poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId,
false, false));
}
@Test
public void testRescheduleAndReconnectOnOutgoingConnectionFailed()
throws Exception { throws Exception {
DuplexPlugin plugin = context.mock(DuplexPlugin.class); DuplexPlugin plugin = context.mock(DuplexPlugin.class);
DuplexTransportConnection duplexConnection = DuplexTransportConnection duplexConnection =
@@ -166,45 +180,40 @@ public class PollerImplTest extends BrambleMockTestCase {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
allowing(plugin).getId(); allowing(plugin).getId();
will(returnValue(transportId)); will(returnValue(transportId));
// reschedule()
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Schedule the next poll
oneOf(plugin).getPollingInterval();
will(returnValue(pollingInterval));
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval), with(MILLISECONDS));
will(returnValue(future));
// connectToContact()
// Check whether the contact is already connected
oneOf(connectionRegistry).isConnected(contactId, transportId);
will(returnValue(false));
// Get the transport properties
oneOf(transportPropertyManager).getRemoteProperties(contactId,
transportId);
will(returnValue(properties));
// Connect to the contact
oneOf(plugin).createConnection(properties);
will(returnValue(duplexConnection));
// Pass the connection to the connection manager
oneOf(connectionManager).manageOutgoingConnection(contactId,
transportId, duplexConnection);
}}); }});
expectReschedule(plugin);
expectReconnect(plugin, duplexConnection);
poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId, poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId,
false)); false, true));
}
@Test
public void testRescheduleOnIncomingConnectionClosed() {
DuplexPlugin plugin = context.mock(DuplexPlugin.class);
context.checking(new Expectations() {{
allowing(plugin).getId();
will(returnValue(transportId));
}});
expectReschedule(plugin);
poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId,
true, false));
}
@Test
public void testRescheduleOnIncomingConnectionFailed() {
DuplexPlugin plugin = context.mock(DuplexPlugin.class);
context.checking(new Expectations() {{
allowing(plugin).getId();
will(returnValue(transportId));
}});
expectReschedule(plugin);
poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId,
true, false));
} }
@Test @Test
@@ -431,4 +440,48 @@ public class PollerImplTest extends BrambleMockTestCase {
poller.eventOccurred(new TransportEnabledEvent(transportId)); poller.eventOccurred(new TransportEnabledEvent(transportId));
poller.eventOccurred(new TransportDisabledEvent(transportId)); poller.eventOccurred(new TransportDisabledEvent(transportId));
} }
private void expectReschedule(Plugin plugin) {
context.checking(new Expectations() {{
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Schedule the next poll
oneOf(plugin).getPollingInterval();
will(returnValue(pollingInterval));
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval), with(MILLISECONDS));
will(returnValue(future));
}});
}
private void expectReconnect(DuplexPlugin plugin,
DuplexTransportConnection duplexConnection) throws Exception {
context.checking(new Expectations() {{
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Check whether the contact is already connected
oneOf(connectionRegistry).isConnected(contactId, transportId);
will(returnValue(false));
// Get the transport properties
oneOf(transportPropertyManager).getRemoteProperties(contactId,
transportId);
will(returnValue(properties));
// Connect to the contact
oneOf(plugin).createConnection(properties);
will(returnValue(duplexConnection));
// Pass the connection to the connection manager
oneOf(connectionManager).manageOutgoingConnection(contactId,
transportId, duplexConnection);
}});
}
} }