The response to a BMP Offer is now an Ack and/or a Request.

The Request packet now contains a list of message IDs, rather than a
bitmap referring to the list of messages IDs in the Offer. This allows
the Request to be understood out of context, e.g. if the Offer and
Request are sent over separate connections or a connection is replayed.
This commit is contained in:
akwizgran
2013-11-19 22:13:26 +00:00
parent 2e472c1d16
commit 1a351535be
12 changed files with 128 additions and 302 deletions

View File

@@ -11,7 +11,6 @@ import static net.sf.briar.db.DatabaseConstants.MIN_FREE_SPACE;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -32,6 +31,7 @@ import net.sf.briar.api.TransportConfig;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.clock.Clock;
import net.sf.briar.api.db.AckAndRequest;
import net.sf.briar.api.db.ContactExistsException;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
@@ -1402,9 +1402,9 @@ DatabaseCleaner.Callback {
return storeGroupMessage(txn, m, c);
}
public Request receiveOffer(ContactId c, Offer o) throws DbException {
Collection<MessageId> offered;
BitSet request;
public AckAndRequest receiveOffer(ContactId c, Offer o) throws DbException {
List<MessageId> ack = new ArrayList<MessageId>();
List<MessageId> request = new ArrayList<MessageId>();
contactLock.readLock().lock();
try {
messageLock.writeLock().lock();
@@ -1415,15 +1415,10 @@ DatabaseCleaner.Callback {
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
offered = o.getMessageIds();
request = new BitSet(offered.size());
Iterator<MessageId> it = offered.iterator();
for(int i = 0; it.hasNext(); i++) {
// If the message is not in the database, or not
// visible to the contact, request it
MessageId m = it.next();
if(!db.setStatusSeenIfVisible(txn, c, m))
request.set(i);
for(MessageId m : o.getMessageIds()) {
// If the message is present and visible, ack it
if(db.setStatusSeenIfVisible(txn, c, m)) ack.add(m);
else request.add(m);
}
db.commitTransaction(txn);
} catch(DbException e) {
@@ -1439,7 +1434,9 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
return new Request(request, offered.size());
Ack a = ack.isEmpty() ? null : new Ack(ack);
Request r = request.isEmpty() ? null : new Request(request);
return new AckAndRequest(a, r);
}
public void receiveRetentionAck(ContactId c, RetentionAck a)

View File

@@ -17,7 +17,6 @@ import static net.sf.briar.api.messaging.Types.TRANSPORT_UPDATE;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -113,22 +112,22 @@ class PacketReaderImpl implements PacketReader {
// Read the start of the struct
r.readStructStart(OFFER);
// Read the message IDs
List<MessageId> messages = new ArrayList<MessageId>();
List<MessageId> offered = new ArrayList<MessageId>();
r.readListStart();
while(!r.hasListEnd()) {
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length != UniqueId.LENGTH)
throw new FormatException();
messages.add(new MessageId(b));
offered.add(new MessageId(b));
}
if(messages.isEmpty()) throw new FormatException();
if(offered.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the struct
r.readStructEnd();
// Reset the reader
r.removeConsumer(counting);
// Build and return the offer
return new Offer(Collections.unmodifiableList(messages));
return new Offer(Collections.unmodifiableList(offered));
}
public boolean hasRequest() throws IOException {
@@ -141,25 +140,23 @@ class PacketReaderImpl implements PacketReader {
r.addConsumer(counting);
// Read the start of the struct
r.readStructStart(REQUEST);
// There may be up to 7 bits of padding at the end of the bitmap
int padding = r.readUint7();
if(padding > 7) throw new FormatException();
// Read the bitmap
byte[] bitmap = r.readBytes(MAX_PACKET_LENGTH);
// Read the message IDs
List<MessageId> requested = new ArrayList<MessageId>();
r.readListStart();
while(!r.hasListEnd()) {
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length != UniqueId.LENGTH)
throw new FormatException();
requested.add(new MessageId(b));
}
if(requested.isEmpty()) throw new FormatException();
r.readListEnd();
// Read the end of the struct
r.readStructEnd();
// Reset the reader
r.removeConsumer(counting);
// Convert the bitmap into a BitSet
int length = bitmap.length * 8 - padding;
BitSet b = new BitSet(length);
for(int i = 0; i < bitmap.length; i++) {
for(int j = 0; j < 8 && i * 8 + j < length; j++) {
byte bit = (byte) (128 >> j);
if((bitmap[i] & bit) != 0) b.set(i * 8 + j);
}
}
return new Request(b, length);
// Build and return the request
return new Request(Collections.unmodifiableList(requested));
}
public boolean hasRetentionAck() throws IOException {

View File

@@ -14,7 +14,6 @@ import static net.sf.briar.api.messaging.Types.TRANSPORT_UPDATE;
import java.io.IOException;
import java.io.OutputStream;
import java.util.BitSet;
import net.sf.briar.api.messaging.Ack;
import net.sf.briar.api.messaging.Group;
@@ -92,22 +91,10 @@ class PacketWriterImpl implements PacketWriter {
}
public void writeRequest(Request r) throws IOException {
BitSet b = r.getBitmap();
int length = r.getLength();
// If the number of bits isn't a multiple of 8, round up to a byte
int bytes = length % 8 == 0 ? length / 8 : length / 8 + 1;
byte[] bitmap = new byte[bytes];
// I'm kind of surprised BitSet doesn't have a method for this
for(int i = 0; i < length; i++) {
if(b.get(i)) {
int offset = i / 8;
byte bit = (byte) (128 >> i % 8);
bitmap[offset] |= bit;
}
}
w.writeStructStart(REQUEST);
w.writeUint7((byte) (bytes * 8 - length));
w.writeBytes(bitmap);
w.writeListStart();
for(MessageId m : r.getMessageIds()) w.writeBytes(m.getBytes());
w.writeListEnd();
w.writeStructEnd();
if(flush) out.flush();
}

View File

@@ -8,12 +8,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
@@ -23,6 +18,7 @@ import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.TransportId;
import net.sf.briar.api.db.AckAndRequest;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.event.ContactRemovedEvent;
@@ -93,8 +89,6 @@ abstract class DuplexConnection implements DatabaseListener {
private final AtomicBoolean canSendOffer, disposed;
private final BlockingQueue<Runnable> writerTasks;
private Collection<MessageId> offered = null; // Locking: this
private volatile PacketWriter writer = null;
DuplexConnection(Executor dbExecutor, Executor cryptoExecutor,
@@ -142,8 +136,11 @@ abstract class DuplexConnection implements DatabaseListener {
} else if(e instanceof LocalSubscriptionsUpdatedEvent) {
LocalSubscriptionsUpdatedEvent l =
(LocalSubscriptionsUpdatedEvent) e;
if(l.getAffectedContacts().contains(contactId))
if(l.getAffectedContacts().contains(contactId)) {
dbExecutor.execute(new GenerateSubscriptionUpdate());
if(canSendOffer.getAndSet(false))
dbExecutor.execute(new GenerateOffer());
}
} else if(e instanceof LocalTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportUpdates());
} else if(e instanceof MessageReceivedEvent) {
@@ -159,6 +156,8 @@ abstract class DuplexConnection implements DatabaseListener {
dbExecutor.execute(new GenerateRetentionAck());
} else if(e instanceof RemoteSubscriptionsUpdatedEvent) {
dbExecutor.execute(new GenerateSubscriptionAck());
if(canSendOffer.getAndSet(false))
dbExecutor.execute(new GenerateOffer());
} else if(e instanceof RemoteTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportAcks());
}
@@ -185,24 +184,7 @@ abstract class DuplexConnection implements DatabaseListener {
} else if(reader.hasRequest()) {
Request r = reader.readRequest();
if(LOG.isLoggable(INFO)) LOG.info("Received request");
// Retrieve the offered message IDs
Collection<MessageId> offered = getOfferedMessageIds();
if(offered == null) throw new FormatException();
// Work out which messages were requested
BitSet b = r.getBitmap();
List<MessageId> requested = new LinkedList<MessageId>();
List<MessageId> seen = new ArrayList<MessageId>();
int i = 0;
for(MessageId m : offered) {
if(b.get(i++)) requested.add(m);
else seen.add(m);
}
requested = Collections.synchronizedList(requested);
seen = Collections.unmodifiableList(seen);
// Mark the unrequested messages as seen
dbExecutor.execute(new SetSeen(seen));
// Start sending the requested messages
dbExecutor.execute(new GenerateBatches(requested));
dbExecutor.execute(new GenerateBatches(r.getMessageIds()));
} else if(reader.hasRetentionAck()) {
RetentionAck a = reader.readRetentionAck();
if(LOG.isLoggable(INFO)) LOG.info("Received retention ack");
@@ -244,17 +226,6 @@ abstract class DuplexConnection implements DatabaseListener {
}
}
private synchronized Collection<MessageId> getOfferedMessageIds() {
Collection<MessageId> ids = offered;
offered = null;
return ids;
}
private synchronized void setOfferedMessageIds(Collection<MessageId> ids) {
assert offered == null;
offered = ids;
}
void write() {
connRegistry.registerConnection(contactId, transportId);
db.addListener(this);
@@ -383,9 +354,15 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
Request r = db.receiveOffer(contactId, offer);
if(LOG.isLoggable(INFO)) LOG.info("DB received offer");
writerTasks.add(new WriteRequest(r));
AckAndRequest ar = db.receiveOffer(contactId, offer);
Ack a = ar.getAck();
Request r = ar.getRequest();
if(LOG.isLoggable(INFO)) {
LOG.info("DB received offer: " + (a != null)
+ " " + (r != null));
}
if(a != null) writerTasks.add(new WriteAck(a));
if(r != null) writerTasks.add(new WriteRequest(r));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -413,25 +390,6 @@ abstract class DuplexConnection implements DatabaseListener {
}
}
// This task runs on a database thread
private class SetSeen implements Runnable {
private final Collection<MessageId> seen;
private SetSeen(Collection<MessageId> seen) {
this.seen = seen;
}
public void run() {
try {
db.setSeen(contactId, seen);
if(LOG.isLoggable(INFO)) LOG.info("DB set seen");
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
// This task runs on a database thread
private class ReceiveRetentionAck implements Runnable {
@@ -649,15 +607,8 @@ abstract class DuplexConnection implements DatabaseListener {
Offer o = db.generateOffer(contactId, maxMessages);
if(LOG.isLoggable(INFO))
LOG.info("Generated offer: " + (o != null));
if(o == null) {
// No messages to offer - wait for some to be added
canSendOffer.set(true);
} else {
// Store the offered message IDs
setOfferedMessageIds(o.getMessageIds());
// Write the offer on the writer thread
writerTasks.add(new WriteOffer(o));
}
if(o == null) canSendOffer.set(true);
else writerTasks.add(new WriteOffer(o));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}