Code cleanup and comments.

This commit is contained in:
akwizgran
2011-10-14 22:48:16 +01:00
parent 0ef1fcb686
commit e214c40b11

View File

@@ -52,9 +52,9 @@ abstract class StreamConnection implements DatabaseListener {
protected final ContactId contactId; protected final ContactId contactId;
protected final StreamTransportConnection connection; protected final StreamTransportConnection connection;
// The following fields must only be accessed with this's lock held // These fields must only be accessed with this's lock held
private int writerFlags = 0; private int writerFlags = 0;
private Collection<MessageId> outgoingOffer = null; private Collection<MessageId> offered = null;
private Collection<MessageId> requested = null; private Collection<MessageId> requested = null;
private Offer incomingOffer = null; private Offer incomingOffer = null;
@@ -107,6 +107,7 @@ abstract class StreamConnection implements DatabaseListener {
db.receiveBatch(contactId, b); db.receiveBatch(contactId, b);
} else if(proto.hasOffer()) { } else if(proto.hasOffer()) {
Offer o = proto.readOffer(); Offer o = proto.readOffer();
// Store the incoming offer and notify the writer
synchronized(this) { synchronized(this) {
writerFlags |= Flags.OFFER_RECEIVED; writerFlags |= Flags.OFFER_RECEIVED;
incomingOffer = o; incomingOffer = o;
@@ -114,31 +115,33 @@ abstract class StreamConnection implements DatabaseListener {
} }
} else if(proto.hasRequest()) { } else if(proto.hasRequest()) {
Request r = proto.readRequest(); Request r = proto.readRequest();
Collection<MessageId> offered, seen, unseen; // Retrieve the offered message IDs
Collection<MessageId> off;
synchronized(this) { synchronized(this) {
if(outgoingOffer == null) if(offered == null)
throw new IOException("Unexpected request packet"); throw new IOException("Unexpected request packet");
offered = outgoingOffer; off = offered;
offered = null;
} }
// Work out which messages were requested // Work out which messages were requested
BitSet b = r.getBitmap(); BitSet b = r.getBitmap();
seen = new ArrayList<MessageId>(); Collection<MessageId> req = new LinkedList<MessageId>();
unseen = new LinkedList<MessageId>(); Collection<MessageId> seen = new ArrayList<MessageId>();
int i = 0; int i = 0;
for(MessageId m : offered) { for(MessageId m : off) {
if(b.get(i++)) unseen.add(m); if(b.get(i++)) req.add(m);
else seen.add(m); else seen.add(m);
} }
// Mark the unrequested messages as seen
db.setSeen(contactId, seen);
// Store the requested message IDs and notify the writer
synchronized(this) { synchronized(this) {
assert outgoingOffer != null; if(requested != null)
if(requested != null && !requested.isEmpty())
throw new IOException("Unexpected request packet"); throw new IOException("Unexpected request packet");
outgoingOffer = null; requested = req;
requested = unseen;
writerFlags |= Flags.REQUEST_RECEIVED; writerFlags |= Flags.REQUEST_RECEIVED;
notifyAll(); notifyAll();
} }
db.setSeen(contactId, seen);
} else if(proto.hasSubscriptionUpdate()) { } else if(proto.hasSubscriptionUpdate()) {
SubscriptionUpdate s = proto.readSubscriptionUpdate(); SubscriptionUpdate s = proto.readSubscriptionUpdate();
db.receiveSubscriptionUpdate(contactId, s); db.receiveSubscriptionUpdate(contactId, s);
@@ -179,12 +182,12 @@ abstract class StreamConnection implements DatabaseListener {
sendAcks(ackWriter); sendAcks(ackWriter);
State state = State.SEND_OFFER; State state = State.SEND_OFFER;
// Main loop // Main loop
boolean close = false; while(true) {
while(!close) {
int flags = 0; int flags = 0;
switch(state) { switch(state) {
case SEND_OFFER: case SEND_OFFER:
// Try to send an offer
if(sendOffer(offerWriter)) state = State.AWAIT_REQUEST; if(sendOffer(offerWriter)) state = State.AWAIT_REQUEST;
else state = State.IDLE; else state = State.IDLE;
break; break;
@@ -203,8 +206,8 @@ abstract class StreamConnection implements DatabaseListener {
// Handle the flags in approximate order of urgency // Handle the flags in approximate order of urgency
if((flags & Flags.CONTACTS_UPDATED) != 0) { if((flags & Flags.CONTACTS_UPDATED) != 0) {
if(!db.getContacts().contains(contactId)) { if(!db.getContacts().contains(contactId)) {
close = true; connection.dispose(true);
break; return;
} }
} }
if((flags & Flags.TRANSPORTS_UPDATED) != 0) { if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
@@ -220,6 +223,7 @@ abstract class StreamConnection implements DatabaseListener {
sendRequest(requestWriter); sendRequest(requestWriter);
} }
if((flags & Flags.REQUEST_RECEIVED) != 0) { if((flags & Flags.REQUEST_RECEIVED) != 0) {
// Should only be received in state AWAIT_REQUEST
throw new IOException("Unexpected request packet"); throw new IOException("Unexpected request packet");
} }
if((flags & Flags.MESSAGES_ADDED) != 0) { if((flags & Flags.MESSAGES_ADDED) != 0) {
@@ -241,8 +245,8 @@ abstract class StreamConnection implements DatabaseListener {
// Handle the flags in approximate order of urgency // Handle the flags in approximate order of urgency
if((flags & Flags.CONTACTS_UPDATED) != 0) { if((flags & Flags.CONTACTS_UPDATED) != 0) {
if(!db.getContacts().contains(contactId)) { if(!db.getContacts().contains(contactId)) {
close = true; connection.dispose(true);
break; return;
} }
} }
if((flags & Flags.TRANSPORTS_UPDATED) != 0) { if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
@@ -274,8 +278,8 @@ abstract class StreamConnection implements DatabaseListener {
// Handle the flags in approximate order of urgency // Handle the flags in approximate order of urgency
if((flags & Flags.CONTACTS_UPDATED) != 0) { if((flags & Flags.CONTACTS_UPDATED) != 0) {
if(!db.getContacts().contains(contactId)) { if(!db.getContacts().contains(contactId)) {
close = true; connection.dispose(true);
break; return;
} }
} }
if((flags & Flags.TRANSPORTS_UPDATED) != 0) { if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
@@ -291,12 +295,13 @@ abstract class StreamConnection implements DatabaseListener {
sendRequest(requestWriter); sendRequest(requestWriter);
} }
if((flags & Flags.REQUEST_RECEIVED) != 0) { if((flags & Flags.REQUEST_RECEIVED) != 0) {
// Should only be received in state AWAIT_REQUEST
throw new IOException("Unexpected request packet"); throw new IOException("Unexpected request packet");
} }
if((flags & Flags.MESSAGES_ADDED) != 0) { if((flags & Flags.MESSAGES_ADDED) != 0) {
// Ignored in this state // Ignored in this state
} }
// Send a batch if possible, otherwise an offer // Try to send a batch
if(!sendBatch(batchWriter)) state = State.SEND_OFFER; if(!sendBatch(batchWriter)) state = State.SEND_OFFER;
break; break;
} }
@@ -317,35 +322,47 @@ abstract class StreamConnection implements DatabaseListener {
} }
private boolean sendBatch(BatchWriter b) throws DbException, IOException { private boolean sendBatch(BatchWriter b) throws DbException, IOException {
Collection<MessageId> ids; Collection<MessageId> req;
// Retrieve the requested message IDs
synchronized(this) { synchronized(this) {
assert outgoingOffer == null; assert offered == null;
assert requested != null; assert requested != null;
ids = requested; req = requested;
}
// Try to generate a batch, updating the collection of message IDs
boolean anyAdded = db.generateBatch(contactId, b, req);
// If no more batches can be generated, discard the remaining IDs
if(!anyAdded) {
synchronized(this) {
assert offered == null;
assert requested == req;
requested = null;
}
} }
boolean anyAdded = db.generateBatch(contactId, b, ids);
if(!anyAdded) ids.clear();
return anyAdded; return anyAdded;
} }
private boolean sendOffer(OfferWriter o) throws DbException, IOException { private boolean sendOffer(OfferWriter o) throws DbException, IOException {
Collection<MessageId> ids = db.generateOffer(contactId, o); // Generate an offer
Collection<MessageId> off = db.generateOffer(contactId, o);
// Store the offered message IDs
synchronized(this) { synchronized(this) {
assert outgoingOffer == null; assert offered == null;
assert requested == null || requested.isEmpty(); assert requested == null;
outgoingOffer = ids; offered = off;
} }
boolean anyOffered = !ids.isEmpty(); return !off.isEmpty();
return anyOffered;
} }
private void sendRequest(RequestWriter r) throws DbException, IOException { private void sendRequest(RequestWriter r) throws DbException, IOException {
Offer o; Offer o;
// Retrieve the incoming offer
synchronized(this) { synchronized(this) {
assert incomingOffer != null; assert incomingOffer != null;
o = incomingOffer; o = incomingOffer;
incomingOffer = null; incomingOffer = null;
} }
// Process the offer and generate a request
db.receiveOffer(contactId, o, r); db.receiveOffer(contactId, o, r);
} }