Don't interrupt connections until priority is set.

This maintains compatibility with older peers that don't know about
priorities or transport preferences and will try to replace any
connections we close.
This commit is contained in:
akwizgran
2020-06-01 14:23:08 +01:00
parent 4aaa8c3b93
commit d3751fbead
2 changed files with 107 additions and 52 deletions

View File

@@ -83,41 +83,16 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
if (incoming) LOG.info("Incoming connection registered: " + t);
else LOG.info("Outgoing connection registered: " + t);
}
List<InterruptibleConnection> toInterrupt;
boolean firstConnection = false, interruptNewConnection = false;
boolean firstConnection;
synchronized (lock) {
List<ConnectionRecord> recs = contactConnections.get(c);
if (recs == null) {
recs = new ArrayList<>();
contactConnections.put(c, recs);
}
if (recs.isEmpty()) {
toInterrupt = emptyList();
firstConnection = true;
} else {
toInterrupt = new ArrayList<>(recs.size());
for (ConnectionRecord rec : recs) {
int compare = compare(t, rec.transportId);
if (compare == -1) {
// The old connection is better than the new one
interruptNewConnection = true;
} else if (compare == 1 && !rec.interrupted) {
// The new connection is better than the old one
toInterrupt.add(rec.conn);
rec.interrupted = true;
}
}
}
firstConnection = recs.isEmpty();
recs.add(new ConnectionRecord(t, conn));
}
if (interruptNewConnection) {
LOG.info("Interrupting new connection");
conn.interruptOutgoingSession();
}
for (InterruptibleConnection old : toInterrupt) {
LOG.info("Interrupting old connection");
old.interruptOutgoingSession();
}
eventBus.broadcast(new ConnectionOpenedEvent(c, t, incoming));
if (firstConnection) {
LOG.info("Contact connected");
@@ -125,17 +100,6 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
}
private int compare(TransportId a, TransportId b) {
if (getBetterTransports(a).contains(b)) return -1;
else if (getBetterTransports(b).contains(a)) return 1;
else return 0;
}
private List<TransportId> getBetterTransports(TransportId t) {
List<TransportId> better = betterTransports.get(t);
return better == null ? emptyList() : better;
}
@Override
public void setPriority(ContactId c, TransportId t,
InterruptibleConnection conn, Priority priority) {
@@ -150,8 +114,9 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
if (rec.conn == conn) {
// Store the priority of this connection
rec.priority = priority;
} else if (rec.transportId.equals(t)) {
int compare = compare(priority, rec.priority);
} else if (rec.priority != null) {
int compare = compareConnections(t, priority,
rec.transportId, rec.priority);
if (compare == -1) {
// The old connection is better than the new one
interruptNewConnection = true;
@@ -173,8 +138,16 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
}
private int compare(Priority a, @Nullable Priority b) {
return b == null ? 0 : Bytes.compare(a.getNonce(), b.getNonce());
private int compareConnections(TransportId tA, Priority pA, TransportId tB,
Priority pB) {
if (getBetterTransports(tA).contains(tB)) return -1;
if (getBetterTransports(tB).contains(tA)) return 1;
return tA.equals(tB) ? Bytes.compare(pA.getNonce(), pB.getNonce()) : 0;
}
private List<TransportId> getBetterTransports(TransportId t) {
List<TransportId> better = betterTransports.get(t);
return better == null ? emptyList() : better;
}
@Override
@@ -184,12 +157,12 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
if (incoming) LOG.info("Incoming connection unregistered: " + t);
else LOG.info("Outgoing connection unregistered: " + t);
}
boolean lastConnection = false;
boolean lastConnection;
synchronized (lock) {
List<ConnectionRecord> recs = contactConnections.get(c);
if (recs == null || !recs.remove(new ConnectionRecord(t, conn)))
throw new IllegalArgumentException();
if (recs.isEmpty()) lastConnection = true;
lastConnection = recs.isEmpty();
}
eventBus.broadcast(new ConnectionClosedEvent(c, t, incoming));
if (lastConnection) {