Several bug fixes for DuplexConnection, logging for ConnectionRegistry.

Some packet types weren't being generated or handled, connections
weren't properly disposed of when exceptions occurred.
This commit is contained in:
akwizgran
2013-04-10 00:09:09 +01:00
parent 0d0a0d3463
commit f1e12c630f
3 changed files with 186 additions and 17 deletions

View File

@@ -74,6 +74,10 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {}
};
private static final Runnable DIE = new Runnable() {
public void run() {}
};
protected final DatabaseComponent db;
protected final ConnectionRegistry connRegistry;
protected final ConnectionReaderFactory connReaderFactory;
@@ -117,7 +121,7 @@ abstract class DuplexConnection implements DatabaseListener {
contactId = ctx.getContactId();
transportId = ctx.getTransportId();
maxLatency = transport.getMaxLatency();
canSendOffer = new AtomicBoolean(false);
canSendOffer = new AtomicBoolean(true);
disposed = new AtomicBoolean(false);
writerTasks = new LinkedBlockingQueue<Runnable>();
}
@@ -170,18 +174,23 @@ abstract class DuplexConnection implements DatabaseListener {
try {
InputStream in = createConnectionReader().getInputStream();
PacketReader reader = packetReaderFactory.createPacketReader(in);
if(LOG.isLoggable(INFO)) LOG.info("Starting to read");
while(!reader.eof()) {
if(reader.hasAck()) {
Ack a = reader.readAck();
if(LOG.isLoggable(INFO)) LOG.info("Received ack");
dbExecutor.execute(new ReceiveAck(a));
} else if(reader.hasMessage()) {
UnverifiedMessage m = reader.readMessage();
if(LOG.isLoggable(INFO)) LOG.info("Received message");
cryptoExecutor.execute(new VerifyMessage(m));
} else if(reader.hasOffer()) {
Offer o = reader.readOffer();
if(LOG.isLoggable(INFO)) LOG.info("Received offer");
dbExecutor.execute(new ReceiveOffer(o));
} else if(reader.hasRequest()) {
Request r = reader.readRequest();
if(LOG.isLoggable(INFO)) LOG.info("Received request");
// Retrieve the offered message IDs
Collection<MessageId> offered = getOfferedMessageIds();
if(offered == null) throw new FormatException();
@@ -200,21 +209,44 @@ abstract class DuplexConnection implements DatabaseListener {
dbExecutor.execute(new SetSeen(seen));
// Start sending the requested messages
dbExecutor.execute(new GenerateBatches(requested));
} else if(reader.hasRetentionAck()) {
RetentionAck a = reader.readRetentionAck();
if(LOG.isLoggable(INFO)) LOG.info("Received retention ack");
dbExecutor.execute(new ReceiveRetentionAck(a));
} else if(reader.hasRetentionUpdate()) {
RetentionUpdate u = reader.readRetentionUpdate();
if(LOG.isLoggable(INFO))
LOG.info("Received retention update");
dbExecutor.execute(new ReceiveRetentionUpdate(u));
} else if(reader.hasSubscriptionAck()) {
SubscriptionAck a = reader.readSubscriptionAck();
if(LOG.isLoggable(INFO))
LOG.info("Received subscription ack");
dbExecutor.execute(new ReceiveSubscriptionAck(a));
} else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = reader.readSubscriptionUpdate();
if(LOG.isLoggable(INFO))
LOG.info("Received subscription update");
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if(reader.hasTransportAck()) {
TransportAck a = reader.readTransportAck();
if(LOG.isLoggable(INFO))
LOG.info("Received transport ack");
dbExecutor.execute(new ReceiveTransportAck(a));
} else if(reader.hasTransportUpdate()) {
TransportUpdate u = reader.readTransportUpdate();
if(LOG.isLoggable(INFO))
LOG.info("Received transport update");
dbExecutor.execute(new ReceiveTransportUpdate(u));
} else {
throw new FormatException();
}
}
// The writer will dispose of the transport if no exceptions occur
if(LOG.isLoggable(INFO)) LOG.info("Finished reading");
writerTasks.add(CLOSE);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
writerTasks.add(DIE);
}
}
@@ -236,6 +268,7 @@ abstract class DuplexConnection implements DatabaseListener {
OutputStream out = createConnectionWriter().getOutputStream();
writer = packetWriterFactory.createPacketWriter(out,
transport.shouldFlush());
if(LOG.isLoggable(INFO)) LOG.info("Starting to write");
// Send the initial packets: updates, acks, offer
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates());
@@ -247,14 +280,22 @@ abstract class DuplexConnection implements DatabaseListener {
if(canSendOffer.getAndSet(false))
dbExecutor.execute(new GenerateOffer());
// Main loop
Runnable task = null;
while(true) {
Runnable task = writerTasks.take();
if(task == CLOSE) break;
if(LOG.isLoggable(INFO))
LOG.info("Waiting for something to write");
task = writerTasks.take();
if(task == CLOSE || task == DIE) break;
task.run();
}
writer.flush();
writer.close();
dispose(false, true);
if(LOG.isLoggable(INFO)) LOG.info("Finished writing");
if(task == CLOSE) {
writer.flush();
writer.close();
dispose(false, true);
} else {
dispose(true, true);
}
} catch(InterruptedException e) {
if(LOG.isLoggable(INFO))
LOG.info("Interrupted while waiting for task");
@@ -262,14 +303,15 @@ abstract class DuplexConnection implements DatabaseListener {
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
} finally {
connRegistry.unregisterConnection(contactId, transportId);
db.removeListener(this);
}
db.removeListener(this);
connRegistry.unregisterConnection(contactId, transportId);
}
private void dispose(boolean exception, boolean recognised) {
if(disposed.getAndSet(true)) return;
if(LOG.isLoggable(INFO))
LOG.info("Disposing: " + exception + ", " + recognised);
ByteUtils.erase(ctx.getSecret());
try {
transport.dispose(exception, recognised);
@@ -290,6 +332,7 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
db.receiveAck(contactId, ack);
if(LOG.isLoggable(INFO)) LOG.info("DB received ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -308,6 +351,7 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
Message m = messageVerifier.verifyMessage(message);
if(LOG.isLoggable(INFO)) LOG.info("Verified message");
dbExecutor.execute(new ReceiveMessage(m));
} catch(GeneralSecurityException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -327,6 +371,7 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
db.receiveMessage(contactId, message);
if(LOG.isLoggable(INFO)) LOG.info("DB received message");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -345,6 +390,7 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
Request r = db.receiveOffer(contactId, offer);
if(LOG.isLoggable(INFO)) LOG.info("DB received offer");
writerTasks.add(new WriteRequest(r));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -365,6 +411,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
writer.writeRequest(request);
if(LOG.isLoggable(INFO)) LOG.info("Sent request");
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
@@ -384,6 +431,66 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
db.setSeen(contactId, seen);
if(LOG.isLoggable(INFO)) LOG.info("DB set seen");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveRetentionAck implements Runnable {
private final RetentionAck ack;
private ReceiveRetentionAck(RetentionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveRetentionAck(contactId, ack);
if(LOG.isLoggable(INFO)) LOG.info("DB received retention ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveRetentionUpdate implements Runnable {
private final RetentionUpdate update;
private ReceiveRetentionUpdate(RetentionUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveRetentionUpdate(contactId, update);
if(LOG.isLoggable(INFO))
LOG.info("DB received retention update");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveSubscriptionAck implements Runnable {
private final SubscriptionAck ack;
private ReceiveSubscriptionAck(SubscriptionAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveSubscriptionAck(contactId, ack);
if(LOG.isLoggable(INFO))
LOG.info("DB received subscription ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -402,6 +509,27 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
db.receiveSubscriptionUpdate(contactId, update);
if(LOG.isLoggable(INFO))
LOG.info("DB received subscription update");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveTransportAck implements Runnable {
private final TransportAck ack;
private ReceiveTransportAck(TransportAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveTransportAck(contactId, ack);
if(LOG.isLoggable(INFO)) LOG.info("DB received transport ack");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -420,6 +548,8 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
db.receiveTransportUpdate(contactId, update);
if(LOG.isLoggable(INFO))
LOG.info("DB received transport update");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -434,6 +564,8 @@ abstract class DuplexConnection implements DatabaseListener {
int maxMessages = writer.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if(a != null) writerTasks.add(new WriteAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -454,6 +586,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
writer.writeAck(ack);
if(LOG.isLoggable(INFO)) LOG.info("Sent ack");
dbExecutor.execute(new GenerateAcks());
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -476,6 +609,8 @@ abstract class DuplexConnection implements DatabaseListener {
try {
Collection<byte[]> batch = db.generateBatch(contactId,
MAX_PACKET_LENGTH, maxLatency, requested);
if(LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (batch != null));
if(batch == null) new GenerateOffer().run();
else writerTasks.add(new WriteBatch(batch, requested));
} catch(DbException e) {
@@ -500,6 +635,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
for(byte[] raw : batch) writer.writeMessage(raw);
if(LOG.isLoggable(INFO)) LOG.info("Sent batch");
if(requested.isEmpty()) dbExecutor.execute(new GenerateOffer());
else dbExecutor.execute(new GenerateBatches(requested));
} catch(IOException e) {
@@ -517,6 +653,8 @@ abstract class DuplexConnection implements DatabaseListener {
int maxMessages = writer.getMaxMessagesForOffer(Long.MAX_VALUE);
try {
Offer o = db.generateOffer(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated offer: " + (o != null));
if(o == null) {
// No messages to offer - wait for some to be added
canSendOffer.set(true);
@@ -545,6 +683,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
writer.writeOffer(offer);
if(LOG.isLoggable(INFO)) LOG.info("Sent offer");
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
@@ -558,6 +697,8 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
RetentionAck a = db.generateRetentionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention ack: " + (a != null));
if(a != null) writerTasks.add(new WriteRetentionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -578,6 +719,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
writer.writeRetentionAck(ack);
if(LOG.isLoggable(INFO)) LOG.info("Sent retention ack");
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
@@ -592,6 +734,8 @@ abstract class DuplexConnection implements DatabaseListener {
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated retention update: " + (u != null));
if(u != null) writerTasks.add(new WriteRetentionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -612,6 +756,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
writer.writeRetentionUpdate(update);
if(LOG.isLoggable(INFO)) LOG.info("Sent retention update");
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
@@ -625,6 +770,8 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription ack: " + (a != null));
if(a != null) writerTasks.add(new WriteSubscriptionAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -645,6 +792,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
writer.writeSubscriptionAck(ack);
if(LOG.isLoggable(INFO)) LOG.info("Sent subscription ack");
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
@@ -659,6 +807,8 @@ abstract class DuplexConnection implements DatabaseListener {
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated subscription update: " + (u != null));
if(u != null) writerTasks.add(new WriteSubscriptionUpdate(u));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -679,6 +829,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
writer.writeSubscriptionUpdate(update);
if(LOG.isLoggable(INFO)) LOG.info("Sent subscription update");
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
@@ -693,6 +844,8 @@ abstract class DuplexConnection implements DatabaseListener {
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport acks: " + (acks != null));
if(acks != null) writerTasks.add(new WriteTransportAcks(acks));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -713,6 +866,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
for(TransportAck a : acks) writer.writeTransportAck(a);
if(LOG.isLoggable(INFO)) LOG.info("Sent transport acks");
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);
@@ -727,6 +881,8 @@ abstract class DuplexConnection implements DatabaseListener {
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);
if(LOG.isLoggable(INFO))
LOG.info("Generated transport updates: " + (t != null));
if(t != null) writerTasks.add(new WriteTransportUpdates(t));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -747,6 +903,7 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null;
try {
for(TransportUpdate u : updates) writer.writeTransportUpdate(u);
if(LOG.isLoggable(INFO)) LOG.info("Sent transport updates");
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);

View File

@@ -108,9 +108,8 @@ class OutgoingSimplexConnection {
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true);
} finally {
connRegistry.unregisterConnection(contactId, transportId);
}
connRegistry.unregisterConnection(contactId, transportId);
}
private boolean writeTransportAcks(ConnectionWriter conn,

View File

@@ -1,5 +1,7 @@
package net.sf.briar.transport;
import static java.util.logging.Level.INFO;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -7,6 +9,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportId;
@@ -15,6 +18,9 @@ import net.sf.briar.api.transport.ConnectionRegistry;
class ConnectionRegistryImpl implements ConnectionRegistry {
private static final Logger LOG =
Logger.getLogger(ConnectionRegistryImpl.class.getName());
// Locking: this
private final Map<TransportId, Map<ContactId, Integer>> connections;
// Locking: this
@@ -36,6 +42,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
}
public void registerConnection(ContactId c, TransportId t) {
if(LOG.isLoggable(INFO)) LOG.info("Connection registered");
boolean firstConnection = false;
synchronized(this) {
Map<ContactId, Integer> m = connections.get(t);
@@ -54,11 +61,14 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
contactCounts.put(c, count + 1);
}
}
if(firstConnection)
if(firstConnection) {
if(LOG.isLoggable(INFO)) LOG.info("Contact connected");
for(ConnectionListener l : listeners) l.contactConnected(c);
}
}
public void unregisterConnection(ContactId c, TransportId t) {
if(LOG.isLoggable(INFO)) LOG.info("Connection unregistered");
boolean lastConnection = false;
synchronized(this) {
Map<ContactId, Integer> m = connections.get(t);
@@ -79,16 +89,19 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
contactCounts.put(c, count - 1);
}
}
if(lastConnection)
if(lastConnection) {
if(LOG.isLoggable(INFO)) LOG.info("Contact disconnected");
for(ConnectionListener l : listeners) l.contactDisconnected(c);
}
}
public synchronized Collection<ContactId> getConnectedContacts(
TransportId t) {
Map<ContactId, Integer> m = connections.get(t);
if(m == null) return Collections.emptyList();
List<ContactId> keys = new ArrayList<ContactId>(m.keySet());
return Collections.unmodifiableList(keys);
List<ContactId> ids = new ArrayList<ContactId>(m.keySet());
if(LOG.isLoggable(INFO)) LOG.info(ids.size() + " contacts connected");
return Collections.unmodifiableList(ids);
}
public synchronized boolean isConnected(ContactId c) {