mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-15 12:19:54 +01:00
Decouple the database from IO.
This will enable asynchronous access to the database for IO threads.
This commit is contained in:
@@ -178,7 +178,8 @@ interface Database<T> {
|
||||
* <p>
|
||||
* Locking: contact read, messageStatus read.
|
||||
*/
|
||||
Collection<BatchId> getBatchesToAck(T txn, ContactId c) throws DbException;
|
||||
Collection<BatchId> getBatchesToAck(T txn, ContactId c, int maxBatches)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the configuration for the given transport.
|
||||
@@ -315,6 +316,16 @@ interface Database<T> {
|
||||
*/
|
||||
int getNumberOfSendableChildren(T txn, MessageId m) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of some messages that are eligible to be sent to the
|
||||
* given contact, up to the given number of messages.
|
||||
* <p>
|
||||
* Locking: contact read, message read, messageStatus read,
|
||||
* subscription read.
|
||||
*/
|
||||
Collection<MessageId> getOfferableMessages(T txn, ContactId c,
|
||||
int maxMessages) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of the oldest messages in the database, with a total
|
||||
* size less than or equal to the given size.
|
||||
@@ -361,16 +372,6 @@ interface Database<T> {
|
||||
*/
|
||||
int getSendability(T txn, MessageId m) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of some messages that are eligible to be sent to the
|
||||
* given contact.
|
||||
* <p>
|
||||
* Locking: contact read, message read, messageStatus read,
|
||||
* subscription read.
|
||||
*/
|
||||
Collection<MessageId> getSendableMessages(T txn, ContactId c)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of some messages that are eligible to be sent to the
|
||||
* given contact, with a total size less than or equal to the given size.
|
||||
|
||||
@@ -20,7 +20,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import net.sf.briar.api.Bytes;
|
||||
import net.sf.briar.api.ContactId;
|
||||
import net.sf.briar.api.Rating;
|
||||
import net.sf.briar.api.TransportConfig;
|
||||
@@ -51,17 +50,14 @@ import net.sf.briar.api.protocol.GroupId;
|
||||
import net.sf.briar.api.protocol.Message;
|
||||
import net.sf.briar.api.protocol.MessageId;
|
||||
import net.sf.briar.api.protocol.Offer;
|
||||
import net.sf.briar.api.protocol.PacketFactory;
|
||||
import net.sf.briar.api.protocol.RawBatch;
|
||||
import net.sf.briar.api.protocol.Request;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.Transport;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
import net.sf.briar.api.protocol.TransportIndex;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
import net.sf.briar.api.protocol.writers.AckWriter;
|
||||
import net.sf.briar.api.protocol.writers.BatchWriter;
|
||||
import net.sf.briar.api.protocol.writers.OfferWriter;
|
||||
import net.sf.briar.api.protocol.writers.RequestWriter;
|
||||
import net.sf.briar.api.protocol.writers.SubscriptionUpdateWriter;
|
||||
import net.sf.briar.api.protocol.writers.TransportUpdateWriter;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionWindow;
|
||||
import net.sf.briar.util.ByteUtils;
|
||||
@@ -105,6 +101,7 @@ DatabaseCleaner.Callback {
|
||||
private final Database<T> db;
|
||||
private final DatabaseCleaner cleaner;
|
||||
private final ShutdownManager shutdown;
|
||||
private final PacketFactory packetFactory;
|
||||
|
||||
private final Collection<DatabaseListener> listeners =
|
||||
new CopyOnWriteArrayList<DatabaseListener>();
|
||||
@@ -119,10 +116,11 @@ DatabaseCleaner.Callback {
|
||||
|
||||
@Inject
|
||||
DatabaseComponentImpl(Database<T> db, DatabaseCleaner cleaner,
|
||||
ShutdownManager shutdown) {
|
||||
ShutdownManager shutdown, PacketFactory packetFactory) {
|
||||
this.db = db;
|
||||
this.cleaner = cleaner;
|
||||
this.shutdown = shutdown;
|
||||
this.packetFactory = packetFactory;
|
||||
}
|
||||
|
||||
public void open(boolean resume) throws DbException, IOException {
|
||||
@@ -265,7 +263,7 @@ DatabaseCleaner.Callback {
|
||||
if(sendability > 0) updateAncestorSendability(txn, id, true);
|
||||
// Count the bytes stored
|
||||
synchronized(spaceLock) {
|
||||
bytesStoredSinceLastCheck += m.getLength();
|
||||
bytesStoredSinceLastCheck += m.getSerialised().length;
|
||||
}
|
||||
}
|
||||
return stored;
|
||||
@@ -373,7 +371,7 @@ DatabaseCleaner.Callback {
|
||||
else db.setStatus(txn, c, id, Status.NEW);
|
||||
// Count the bytes stored
|
||||
synchronized(spaceLock) {
|
||||
bytesStoredSinceLastCheck += m.getLength();
|
||||
bytesStoredSinceLastCheck += m.getSerialised().length;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -415,17 +413,16 @@ DatabaseCleaner.Callback {
|
||||
return i;
|
||||
}
|
||||
|
||||
public boolean generateAck(ContactId c, AckWriter a) throws DbException,
|
||||
IOException {
|
||||
public Ack generateAck(ContactId c, int maxBatches) throws DbException {
|
||||
Collection<BatchId> acked;
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
if(!containsContact(c)) throw new NoSuchContactException();
|
||||
Collection<BatchId> acks, sent = new ArrayList<BatchId>();
|
||||
messageStatusLock.readLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
acks = db.getBatchesToAck(txn, c);
|
||||
acked = db.getBatchesToAck(txn, c, maxBatches);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -434,20 +431,14 @@ DatabaseCleaner.Callback {
|
||||
} finally {
|
||||
messageStatusLock.readLock().unlock();
|
||||
}
|
||||
for(BatchId b : acks) {
|
||||
if(!a.writeBatchId(b)) break;
|
||||
sent.add(b);
|
||||
}
|
||||
// Record the contents of the ack, unless it's empty
|
||||
if(sent.isEmpty()) return false;
|
||||
a.finish();
|
||||
if(acked.isEmpty()) return null;
|
||||
// Record the contents of the ack
|
||||
messageStatusLock.writeLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
db.removeBatchesToAck(txn, c, sent);
|
||||
db.removeBatchesToAck(txn, c, acked);
|
||||
db.commitTransaction(txn);
|
||||
return true;
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
throw e;
|
||||
@@ -458,12 +449,14 @@ DatabaseCleaner.Callback {
|
||||
} finally {
|
||||
contactLock.readLock().unlock();
|
||||
}
|
||||
return packetFactory.createAck(acked);
|
||||
}
|
||||
|
||||
public boolean generateBatch(ContactId c, BatchWriter b) throws DbException,
|
||||
IOException {
|
||||
public RawBatch generateBatch(ContactId c, int capacity)
|
||||
throws DbException {
|
||||
Collection<MessageId> ids;
|
||||
Collection<Bytes> messages = new ArrayList<Bytes>();
|
||||
List<byte[]> messages = new ArrayList<byte[]>();
|
||||
RawBatch b;
|
||||
// Get some sendable messages from the database
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
@@ -476,10 +469,9 @@ DatabaseCleaner.Callback {
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
int capacity = b.getCapacity();
|
||||
ids = db.getSendableMessages(txn, c, capacity);
|
||||
for(MessageId m : ids) {
|
||||
messages.add(new Bytes(db.getMessage(txn, m)));
|
||||
messages.add(db.getMessage(txn, m));
|
||||
}
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
@@ -492,40 +484,14 @@ DatabaseCleaner.Callback {
|
||||
} finally {
|
||||
messageStatusLock.readLock().unlock();
|
||||
}
|
||||
} finally {
|
||||
messageLock.readLock().unlock();
|
||||
}
|
||||
} finally {
|
||||
contactLock.readLock().unlock();
|
||||
}
|
||||
if(ids.isEmpty()) return false;
|
||||
writeAndRecordBatch(c, b, ids, messages);
|
||||
return true;
|
||||
}
|
||||
|
||||
private void writeAndRecordBatch(ContactId c, BatchWriter b,
|
||||
Collection<MessageId> ids, Collection<Bytes> messages)
|
||||
throws DbException, IOException {
|
||||
assert !ids.isEmpty();
|
||||
assert !messages.isEmpty();
|
||||
assert ids.size() == messages.size();
|
||||
// Add the messages to the batch
|
||||
for(Bytes raw : messages) {
|
||||
boolean written = b.writeMessage(raw.getBytes());
|
||||
assert written;
|
||||
}
|
||||
BatchId id = b.finish();
|
||||
// Record the contents of the batch
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
if(!containsContact(c)) throw new NoSuchContactException();
|
||||
messageLock.readLock().lock();
|
||||
try {
|
||||
if(messages.isEmpty()) return null;
|
||||
messages = Collections.unmodifiableList(messages);
|
||||
b = packetFactory.createBatch(messages);
|
||||
messageStatusLock.writeLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
db.addOutstandingBatch(txn, c, id, ids);
|
||||
db.addOutstandingBatch(txn, c, b.getId(), ids);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -540,12 +506,14 @@ DatabaseCleaner.Callback {
|
||||
} finally {
|
||||
contactLock.readLock().unlock();
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
public boolean generateBatch(ContactId c, BatchWriter b,
|
||||
Collection<MessageId> requested) throws DbException, IOException {
|
||||
public RawBatch generateBatch(ContactId c, int capacity,
|
||||
Collection<MessageId> requested) throws DbException {
|
||||
Collection<MessageId> ids = new ArrayList<MessageId>();
|
||||
Collection<Bytes> messages = new ArrayList<Bytes>();
|
||||
List<byte[]> messages = new ArrayList<byte[]>();
|
||||
RawBatch b;
|
||||
// Get some sendable messages from the database
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
@@ -558,15 +526,15 @@ DatabaseCleaner.Callback {
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
int capacity = b.getCapacity();
|
||||
Iterator<MessageId> it = requested.iterator();
|
||||
while(it.hasNext()) {
|
||||
MessageId m = it.next();
|
||||
byte[] raw = db.getMessageIfSendable(txn, c, m);
|
||||
if(raw != null) {
|
||||
if(raw.length > capacity) break;
|
||||
messages.add(raw);
|
||||
ids.add(m);
|
||||
messages.add(new Bytes(raw));
|
||||
capacity -= raw.length;
|
||||
}
|
||||
it.remove();
|
||||
}
|
||||
@@ -581,21 +549,34 @@ DatabaseCleaner.Callback {
|
||||
} finally {
|
||||
messageStatusLock.readLock().unlock();
|
||||
}
|
||||
if(messages.isEmpty()) return null;
|
||||
messages = Collections.unmodifiableList(messages);
|
||||
b = packetFactory.createBatch(messages);
|
||||
messageStatusLock.writeLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
db.addOutstandingBatch(txn, c, b.getId(), ids);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
messageStatusLock.writeLock().unlock();
|
||||
}
|
||||
} finally {
|
||||
messageLock.readLock().unlock();
|
||||
}
|
||||
} finally {
|
||||
contactLock.readLock().unlock();
|
||||
}
|
||||
if(ids.isEmpty()) return false;
|
||||
writeAndRecordBatch(c, b, ids, messages);
|
||||
return true;
|
||||
return b;
|
||||
}
|
||||
|
||||
public Collection<MessageId> generateOffer(ContactId c, OfferWriter o)
|
||||
throws DbException, IOException {
|
||||
Collection<MessageId> sendable;
|
||||
List<MessageId> sent = new ArrayList<MessageId>();
|
||||
public Offer generateOffer(ContactId c, int maxMessages)
|
||||
throws DbException {
|
||||
Collection<MessageId> offered;
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
if(!containsContact(c)) throw new NoSuchContactException();
|
||||
@@ -605,7 +586,7 @@ DatabaseCleaner.Callback {
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
sendable = db.getSendableMessages(txn, c);
|
||||
offered = db.getOfferableMessages(txn, c, maxMessages);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -620,33 +601,41 @@ DatabaseCleaner.Callback {
|
||||
} finally {
|
||||
contactLock.readLock().unlock();
|
||||
}
|
||||
for(MessageId m : sendable) {
|
||||
if(!o.writeMessageId(m)) break;
|
||||
sent.add(m);
|
||||
}
|
||||
if(!sent.isEmpty()) o.finish();
|
||||
return Collections.unmodifiableList(sent);
|
||||
return packetFactory.createOffer(offered);
|
||||
}
|
||||
|
||||
public void generateSubscriptionUpdate(ContactId c,
|
||||
SubscriptionUpdateWriter s) throws DbException, IOException {
|
||||
Map<Group, Long> subs = null;
|
||||
long timestamp = 0L;
|
||||
public SubscriptionUpdate generateSubscriptionUpdate(ContactId c)
|
||||
throws DbException {
|
||||
boolean due;
|
||||
Map<Group, Long> subs;
|
||||
long timestamp;
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
if(!containsContact(c)) throw new NoSuchContactException();
|
||||
subscriptionLock.writeLock().lock();
|
||||
subscriptionLock.readLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
// Work out whether an update is due
|
||||
long modified = db.getSubscriptionsModified(txn, c);
|
||||
long sent = db.getSubscriptionsSent(txn, c);
|
||||
if(modified >= sent || updateIsDue(sent)) {
|
||||
subs = db.getVisibleSubscriptions(txn, c);
|
||||
timestamp = System.currentTimeMillis();
|
||||
db.setSubscriptionsSent(txn, c, timestamp);
|
||||
}
|
||||
due = modified >= sent || updateIsDue(sent);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
subscriptionLock.readLock().unlock();
|
||||
}
|
||||
if(!due) return null;
|
||||
subscriptionLock.writeLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
subs = db.getVisibleSubscriptions(txn, c);
|
||||
timestamp = System.currentTimeMillis();
|
||||
db.setSubscriptionsSent(txn, c, timestamp);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -658,7 +647,7 @@ DatabaseCleaner.Callback {
|
||||
} finally {
|
||||
contactLock.readLock().unlock();
|
||||
}
|
||||
if(subs != null) s.writeSubscriptions(subs, timestamp);
|
||||
return packetFactory.createSubscriptionUpdate(subs, timestamp);
|
||||
}
|
||||
|
||||
private boolean updateIsDue(long sent) {
|
||||
@@ -666,25 +655,38 @@ DatabaseCleaner.Callback {
|
||||
return now - sent >= DatabaseConstants.MAX_UPDATE_INTERVAL;
|
||||
}
|
||||
|
||||
public void generateTransportUpdate(ContactId c, TransportUpdateWriter t)
|
||||
throws DbException, IOException {
|
||||
Collection<Transport> transports = null;
|
||||
long timestamp = 0L;
|
||||
public TransportUpdate generateTransportUpdate(ContactId c)
|
||||
throws DbException {
|
||||
boolean due;
|
||||
Collection<Transport> transports;
|
||||
long timestamp;
|
||||
contactLock.readLock().lock();
|
||||
try {
|
||||
if(!containsContact(c)) throw new NoSuchContactException();
|
||||
transportLock.writeLock().lock();
|
||||
transportLock.readLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
// Work out whether an update is due
|
||||
long modified = db.getTransportsModified(txn);
|
||||
long sent = db.getTransportsSent(txn, c);
|
||||
if(modified >= sent || updateIsDue(sent)) {
|
||||
transports = db.getLocalTransports(txn);
|
||||
timestamp = System.currentTimeMillis();
|
||||
db.setTransportsSent(txn, c, timestamp);
|
||||
}
|
||||
due = modified >= sent || updateIsDue(sent);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
transportLock.readLock().unlock();
|
||||
}
|
||||
if(!due) return null;
|
||||
transportLock.writeLock().lock();
|
||||
try {
|
||||
T txn = db.startTransaction();
|
||||
try {
|
||||
transports = db.getLocalTransports(txn);
|
||||
timestamp = System.currentTimeMillis();
|
||||
db.setTransportsSent(txn, c, timestamp);
|
||||
db.commitTransaction(txn);
|
||||
} catch(DbException e) {
|
||||
db.abortTransaction(txn);
|
||||
@@ -696,7 +698,7 @@ DatabaseCleaner.Callback {
|
||||
} finally {
|
||||
contactLock.readLock().unlock();
|
||||
}
|
||||
if(transports != null) t.writeTransports(transports, timestamp);
|
||||
return packetFactory.createTransportUpdate(transports, timestamp);
|
||||
}
|
||||
|
||||
public TransportConfig getConfig(TransportId t) throws DbException {
|
||||
@@ -1119,8 +1121,7 @@ DatabaseCleaner.Callback {
|
||||
return anyStored;
|
||||
}
|
||||
|
||||
public void receiveOffer(ContactId c, Offer o, RequestWriter r)
|
||||
throws DbException, IOException {
|
||||
public Request receiveOffer(ContactId c, Offer o) throws DbException {
|
||||
Collection<MessageId> offered;
|
||||
BitSet request;
|
||||
contactLock.readLock().lock();
|
||||
@@ -1161,7 +1162,7 @@ DatabaseCleaner.Callback {
|
||||
} finally {
|
||||
contactLock.readLock().unlock();
|
||||
}
|
||||
r.writeRequest(request, offered.size());
|
||||
return packetFactory.createRequest(request, offered.size());
|
||||
}
|
||||
|
||||
public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate s)
|
||||
|
||||
@@ -10,6 +10,7 @@ import net.sf.briar.api.db.DatabaseMaxSize;
|
||||
import net.sf.briar.api.db.DatabasePassword;
|
||||
import net.sf.briar.api.lifecycle.ShutdownManager;
|
||||
import net.sf.briar.api.protocol.GroupFactory;
|
||||
import net.sf.briar.api.protocol.PacketFactory;
|
||||
import net.sf.briar.api.transport.ConnectionContextFactory;
|
||||
import net.sf.briar.api.transport.ConnectionWindowFactory;
|
||||
|
||||
@@ -36,7 +37,9 @@ public class DatabaseModule extends AbstractModule {
|
||||
|
||||
@Provides @Singleton
|
||||
DatabaseComponent getDatabaseComponent(Database<Connection> db,
|
||||
DatabaseCleaner cleaner, ShutdownManager shutdown) {
|
||||
return new DatabaseComponentImpl<Connection>(db, cleaner, shutdown);
|
||||
DatabaseCleaner cleaner, ShutdownManager shutdown,
|
||||
PacketFactory packetFactory) {
|
||||
return new DatabaseComponentImpl<Connection>(db, cleaner, shutdown,
|
||||
packetFactory);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -612,10 +612,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
else ps.setBytes(4, m.getAuthor().getBytes());
|
||||
ps.setString(5, m.getSubject());
|
||||
ps.setLong(6, m.getTimestamp());
|
||||
ps.setInt(7, m.getLength());
|
||||
byte[] raw = m.getSerialised();
|
||||
ps.setInt(7, raw.length);
|
||||
ps.setInt(8, m.getBodyStart());
|
||||
ps.setInt(9, m.getBodyLength());
|
||||
ps.setBytes(10, m.getSerialised());
|
||||
ps.setBytes(10, raw);
|
||||
int affected = ps.executeUpdate();
|
||||
if(affected != 1) throw new DbStateException();
|
||||
ps.close();
|
||||
@@ -700,10 +701,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
else ps.setBytes(2, m.getParent().getBytes());
|
||||
ps.setString(3, m.getSubject());
|
||||
ps.setLong(4, m.getTimestamp());
|
||||
ps.setInt(5, m.getLength());
|
||||
byte[] raw = m.getSerialised();
|
||||
ps.setInt(5, raw.length);
|
||||
ps.setInt(6, m.getBodyStart());
|
||||
ps.setInt(7, m.getBodyLength());
|
||||
ps.setBytes(8, m.getSerialised());
|
||||
ps.setBytes(8, raw);
|
||||
ps.setInt(9, c.getInt());
|
||||
int affected = ps.executeUpdate();
|
||||
if(affected != 1) throw new DbStateException();
|
||||
@@ -889,15 +891,17 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<BatchId> getBatchesToAck(Connection txn, ContactId c)
|
||||
throws DbException {
|
||||
public Collection<BatchId> getBatchesToAck(Connection txn, ContactId c,
|
||||
int maxBatches) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT batchId FROM batchesToAck"
|
||||
+ " WHERE contactId = ?";
|
||||
+ " WHERE contactId = ?"
|
||||
+ " LIMIT ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, maxBatches);
|
||||
rs = ps.executeQuery();
|
||||
List<BatchId> ids = new ArrayList<BatchId>();
|
||||
while(rs.next()) ids.add(new BatchId(rs.getBytes(1)));
|
||||
@@ -1517,8 +1521,8 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<MessageId> getSendableMessages(Connection txn,
|
||||
ContactId c) throws DbException {
|
||||
public Collection<MessageId> getOfferableMessages(Connection txn,
|
||||
ContactId c, int maxMessages) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
@@ -1526,15 +1530,19 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
String sql = "SELECT messages.messageId FROM messages"
|
||||
+ " JOIN statuses ON messages.messageId = statuses.messageId"
|
||||
+ " WHERE messages.contactId = ? AND status = ?"
|
||||
+ " ORDER BY timestamp";
|
||||
+ " ORDER BY timestamp"
|
||||
+ " LIMIT ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setShort(2, (short) Status.NEW.ordinal());
|
||||
ps.setInt(3, maxMessages);
|
||||
rs = ps.executeQuery();
|
||||
List<MessageId> ids = new ArrayList<MessageId>();
|
||||
while(rs.next()) ids.add(new MessageId(rs.getBytes(2)));
|
||||
rs.close();
|
||||
ps.close();
|
||||
if(ids.size() == maxMessages)
|
||||
return Collections.unmodifiableList(ids);
|
||||
// Do we have any sendable group messages?
|
||||
sql = "SELECT m.messageId FROM messages AS m"
|
||||
+ " JOIN contactSubscriptions AS cs"
|
||||
@@ -1547,10 +1555,12 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
+ " AND timestamp >= start"
|
||||
+ " AND status = ?"
|
||||
+ " AND sendability > ZERO()"
|
||||
+ " ORDER BY timestamp";
|
||||
+ " ORDER BY timestamp"
|
||||
+ " LIMIT ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setShort(2, (short) Status.NEW.ordinal());
|
||||
ps.setInt(3, maxMessages - ids.size());
|
||||
rs = ps.executeQuery();
|
||||
while(rs.next()) ids.add(new MessageId(rs.getBytes(2)));
|
||||
rs.close();
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import net.sf.briar.api.protocol.Ack;
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
|
||||
interface AckFactory {
|
||||
|
||||
Ack createAck(Collection<BatchId> acked);
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import net.sf.briar.api.protocol.Ack;
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
|
||||
class AckFactoryImpl implements AckFactory {
|
||||
|
||||
public Ack createAck(Collection<BatchId> acked) {
|
||||
return new AckImpl(acked);
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import java.util.Collection;
|
||||
import net.sf.briar.api.FormatException;
|
||||
import net.sf.briar.api.protocol.Ack;
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
import net.sf.briar.api.protocol.PacketFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolConstants;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
import net.sf.briar.api.protocol.UniqueId;
|
||||
@@ -16,11 +17,11 @@ import net.sf.briar.api.serial.Reader;
|
||||
|
||||
class AckReader implements ObjectReader<Ack> {
|
||||
|
||||
private final AckFactory ackFactory;
|
||||
private final PacketFactory packetFactory;
|
||||
private final ObjectReader<BatchId> batchIdReader;
|
||||
|
||||
AckReader(AckFactory ackFactory) {
|
||||
this.ackFactory = ackFactory;
|
||||
AckReader(PacketFactory packetFactory) {
|
||||
this.packetFactory = packetFactory;
|
||||
batchIdReader = new BatchIdReader();
|
||||
}
|
||||
|
||||
@@ -36,7 +37,7 @@ class AckReader implements ObjectReader<Ack> {
|
||||
r.removeObjectReader(Types.BATCH_ID);
|
||||
r.removeConsumer(counting);
|
||||
// Build and return the ack
|
||||
return ackFactory.createAck(batches);
|
||||
return packetFactory.createAck(batches);
|
||||
}
|
||||
|
||||
private static class BatchIdReader implements ObjectReader<BatchId> {
|
||||
|
||||
@@ -59,10 +59,6 @@ class MessageImpl implements Message {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public int getLength() {
|
||||
return raw.length;
|
||||
}
|
||||
|
||||
public byte[] getSerialised() {
|
||||
return raw;
|
||||
}
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import net.sf.briar.api.protocol.MessageId;
|
||||
import net.sf.briar.api.protocol.Offer;
|
||||
|
||||
interface OfferFactory {
|
||||
|
||||
Offer createOffer(Collection<MessageId> offered);
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import net.sf.briar.api.protocol.MessageId;
|
||||
import net.sf.briar.api.protocol.Offer;
|
||||
|
||||
class OfferFactoryImpl implements OfferFactory {
|
||||
|
||||
public Offer createOffer(Collection<MessageId> offered) {
|
||||
return new OfferImpl(offered);
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import java.util.Collection;
|
||||
|
||||
import net.sf.briar.api.protocol.MessageId;
|
||||
import net.sf.briar.api.protocol.Offer;
|
||||
import net.sf.briar.api.protocol.PacketFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolConstants;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
import net.sf.briar.api.serial.Consumer;
|
||||
@@ -15,12 +16,12 @@ import net.sf.briar.api.serial.Reader;
|
||||
class OfferReader implements ObjectReader<Offer> {
|
||||
|
||||
private final ObjectReader<MessageId> messageIdReader;
|
||||
private final OfferFactory offerFactory;
|
||||
private final PacketFactory packetFactory;
|
||||
|
||||
OfferReader(ObjectReader<MessageId> messageIdReader,
|
||||
OfferFactory offerFactory) {
|
||||
PacketFactory packetFactory) {
|
||||
this.messageIdReader = messageIdReader;
|
||||
this.offerFactory = offerFactory;
|
||||
this.packetFactory = packetFactory;
|
||||
}
|
||||
|
||||
public Offer readObject(Reader r) throws IOException {
|
||||
@@ -35,6 +36,6 @@ class OfferReader implements ObjectReader<Offer> {
|
||||
r.removeObjectReader(Types.MESSAGE_ID);
|
||||
r.removeConsumer(counting);
|
||||
// Build and return the offer
|
||||
return offerFactory.createOffer(messages);
|
||||
return packetFactory.createOffer(messages);
|
||||
}
|
||||
}
|
||||
|
||||
59
components/net/sf/briar/protocol/PacketFactoryImpl.java
Normal file
59
components/net/sf/briar/protocol/PacketFactoryImpl.java
Normal file
@@ -0,0 +1,59 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.BitSet;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
import net.sf.briar.api.crypto.CryptoComponent;
|
||||
import net.sf.briar.api.crypto.MessageDigest;
|
||||
import net.sf.briar.api.protocol.Ack;
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
import net.sf.briar.api.protocol.Group;
|
||||
import net.sf.briar.api.protocol.MessageId;
|
||||
import net.sf.briar.api.protocol.Offer;
|
||||
import net.sf.briar.api.protocol.PacketFactory;
|
||||
import net.sf.briar.api.protocol.RawBatch;
|
||||
import net.sf.briar.api.protocol.Request;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.Transport;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
class PacketFactoryImpl implements PacketFactory {
|
||||
|
||||
private final CryptoComponent crypto;
|
||||
|
||||
@Inject
|
||||
PacketFactoryImpl(CryptoComponent crypto) {
|
||||
this.crypto = crypto;
|
||||
}
|
||||
|
||||
public Ack createAck(Collection<BatchId> acked) {
|
||||
return new AckImpl(acked);
|
||||
}
|
||||
|
||||
public RawBatch createBatch(Collection<byte[]> messages) {
|
||||
MessageDigest messageDigest = crypto.getMessageDigest();
|
||||
for(byte[] raw : messages) messageDigest.update(raw);
|
||||
return new RawBatchImpl(new BatchId(messageDigest.digest()), messages);
|
||||
}
|
||||
|
||||
public Offer createOffer(Collection<MessageId> offered) {
|
||||
return new OfferImpl(offered);
|
||||
}
|
||||
|
||||
public Request createRequest(BitSet requested, int length) {
|
||||
return new RequestImpl(requested, length);
|
||||
}
|
||||
|
||||
public SubscriptionUpdate createSubscriptionUpdate(Map<Group, Long> subs,
|
||||
long timestamp) {
|
||||
return new SubscriptionUpdateImpl(subs, timestamp);
|
||||
}
|
||||
|
||||
public TransportUpdate createTransportUpdate(
|
||||
Collection<Transport> transports, long timestamp) {
|
||||
return new TransportUpdateImpl(transports, timestamp);
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,9 @@ import net.sf.briar.api.protocol.GroupFactory;
|
||||
import net.sf.briar.api.protocol.MessageFactory;
|
||||
import net.sf.briar.api.protocol.MessageId;
|
||||
import net.sf.briar.api.protocol.Offer;
|
||||
import net.sf.briar.api.protocol.PacketFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.Request;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
@@ -23,21 +25,17 @@ public class ProtocolModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(AckFactory.class).to(AckFactoryImpl.class);
|
||||
bind(AuthorFactory.class).to(AuthorFactoryImpl.class);
|
||||
bind(GroupFactory.class).to(GroupFactoryImpl.class);
|
||||
bind(MessageFactory.class).to(MessageFactoryImpl.class);
|
||||
bind(OfferFactory.class).to(OfferFactoryImpl.class);
|
||||
bind(PacketFactory.class).to(PacketFactoryImpl.class);
|
||||
bind(ProtocolReaderFactory.class).to(ProtocolReaderFactoryImpl.class);
|
||||
bind(RequestFactory.class).to(RequestFactoryImpl.class);
|
||||
bind(SubscriptionUpdateFactory.class).to(
|
||||
SubscriptionUpdateFactoryImpl.class);
|
||||
bind(TransportUpdateFactory.class).to(TransportUpdateFactoryImpl.class);
|
||||
bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class);
|
||||
bind(UnverifiedBatchFactory.class).to(UnverifiedBatchFactoryImpl.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
ObjectReader<Ack> getAckReader(AckFactory ackFactory) {
|
||||
ObjectReader<Ack> getAckReader(PacketFactory ackFactory) {
|
||||
return new AckReader(ackFactory);
|
||||
}
|
||||
|
||||
@@ -75,25 +73,24 @@ public class ProtocolModule extends AbstractModule {
|
||||
|
||||
@Provides
|
||||
ObjectReader<Offer> getOfferReader(ObjectReader<MessageId> messageIdReader,
|
||||
OfferFactory offerFactory) {
|
||||
return new OfferReader(messageIdReader, offerFactory);
|
||||
PacketFactory packetFactory) {
|
||||
return new OfferReader(messageIdReader, packetFactory);
|
||||
}
|
||||
|
||||
@Provides
|
||||
ObjectReader<Request> getRequestReader(RequestFactory requestFactory) {
|
||||
return new RequestReader(requestFactory);
|
||||
ObjectReader<Request> getRequestReader(PacketFactory packetFactory) {
|
||||
return new RequestReader(packetFactory);
|
||||
}
|
||||
|
||||
@Provides
|
||||
ObjectReader<SubscriptionUpdate> getSubscriptionReader(
|
||||
ObjectReader<Group> groupReader,
|
||||
SubscriptionUpdateFactory subscriptionFactory) {
|
||||
return new SubscriptionUpdateReader(groupReader, subscriptionFactory);
|
||||
ObjectReader<Group> groupReader, PacketFactory packetFactory) {
|
||||
return new SubscriptionUpdateReader(groupReader, packetFactory);
|
||||
}
|
||||
|
||||
@Provides
|
||||
ObjectReader<TransportUpdate> getTransportReader(
|
||||
TransportUpdateFactory transportFactory) {
|
||||
return new TransportUpdateReader(transportFactory);
|
||||
PacketFactory packetFactory) {
|
||||
return new TransportUpdateReader(packetFactory);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.io.OutputStream;
|
||||
|
||||
import net.sf.briar.api.protocol.ProtocolWriter;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.serial.WriterFactory;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
class ProtocolWriterFactoryImpl implements ProtocolWriterFactory {
|
||||
|
||||
private final SerialComponent serial;
|
||||
private final WriterFactory writerFactory;
|
||||
|
||||
@Inject
|
||||
ProtocolWriterFactoryImpl(SerialComponent serial,
|
||||
WriterFactory writerFactory) {
|
||||
this.serial = serial;
|
||||
this.writerFactory = writerFactory;
|
||||
}
|
||||
|
||||
public ProtocolWriter createProtocolWriter(OutputStream out) {
|
||||
return new ProtocolWriterImpl(serial, writerFactory, out);
|
||||
}
|
||||
}
|
||||
143
components/net/sf/briar/protocol/ProtocolWriterImpl.java
Normal file
143
components/net/sf/briar/protocol/ProtocolWriterImpl.java
Normal file
@@ -0,0 +1,143 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.BitSet;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import net.sf.briar.api.protocol.Ack;
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
import net.sf.briar.api.protocol.Group;
|
||||
import net.sf.briar.api.protocol.MessageId;
|
||||
import net.sf.briar.api.protocol.Offer;
|
||||
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
|
||||
import net.sf.briar.api.protocol.ProtocolWriter;
|
||||
import net.sf.briar.api.protocol.RawBatch;
|
||||
import net.sf.briar.api.protocol.Request;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.Transport;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.serial.Writer;
|
||||
import net.sf.briar.api.serial.WriterFactory;
|
||||
|
||||
// This class is not thread-safe
|
||||
class ProtocolWriterImpl implements ProtocolWriter {
|
||||
|
||||
private final SerialComponent serial;
|
||||
private final OutputStream out;
|
||||
private final Writer w;
|
||||
|
||||
ProtocolWriterImpl(SerialComponent serial, WriterFactory writerFactory,
|
||||
OutputStream out) {
|
||||
this.serial = serial;
|
||||
this.out = out;
|
||||
w = writerFactory.createWriter(out);
|
||||
}
|
||||
|
||||
public int getMaxBatchesForAck(long capacity) {
|
||||
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
|
||||
int overhead = serial.getSerialisedStructIdLength(Types.ACK)
|
||||
+ serial.getSerialisedListStartLength()
|
||||
+ serial.getSerialisedListEndLength();
|
||||
int idLength = serial.getSerialisedUniqueIdLength(Types.BATCH_ID);
|
||||
return (packet - overhead) / idLength;
|
||||
}
|
||||
|
||||
public int getMaxMessagesForOffer(long capacity) {
|
||||
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
|
||||
int overhead = serial.getSerialisedStructIdLength(Types.OFFER)
|
||||
+ serial.getSerialisedListStartLength()
|
||||
+ serial.getSerialisedListEndLength();
|
||||
int idLength = serial.getSerialisedUniqueIdLength(Types.MESSAGE_ID);
|
||||
return (packet - overhead) / idLength;
|
||||
}
|
||||
|
||||
public int getMessageCapacityForBatch(long capacity) {
|
||||
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
|
||||
int overhead = serial.getSerialisedStructIdLength(Types.BATCH)
|
||||
+ serial.getSerialisedListStartLength()
|
||||
+ serial.getSerialisedListEndLength();
|
||||
return packet - overhead;
|
||||
}
|
||||
|
||||
public void writeAck(Ack a) throws IOException {
|
||||
w.writeStructId(Types.ACK);
|
||||
w.writeListStart();
|
||||
for(BatchId b : a.getBatchIds()) {
|
||||
w.writeStructId(Types.BATCH_ID);
|
||||
w.writeBytes(b.getBytes());
|
||||
}
|
||||
w.writeListEnd();
|
||||
}
|
||||
|
||||
public void writeBatch(RawBatch b) throws IOException {
|
||||
w.writeStructId(Types.BATCH);
|
||||
w.writeListStart();
|
||||
for(byte[] raw : b.getMessages()) out.write(raw);
|
||||
w.writeListEnd();
|
||||
}
|
||||
|
||||
public void writeOffer(Offer o) throws IOException {
|
||||
w.writeStructId(Types.OFFER);
|
||||
w.writeListStart();
|
||||
for(MessageId m : o.getMessageIds()) {
|
||||
w.writeStructId(Types.MESSAGE_ID);
|
||||
w.writeBytes(m.getBytes());
|
||||
}
|
||||
w.writeListEnd();
|
||||
}
|
||||
|
||||
public void writeRequest(Request r) throws IOException {
|
||||
BitSet b = r.getBitmap();
|
||||
int length = r.getLength();
|
||||
// If the number of bits isn't a multiple of 8, round up to a byte
|
||||
int bytes = length % 8 == 0 ? length / 8 : length / 8 + 1;
|
||||
byte[] bitmap = new byte[bytes];
|
||||
// I'm kind of surprised BitSet doesn't have a method for this
|
||||
for(int i = 0; i < length; i++) {
|
||||
if(b.get(i)) {
|
||||
int offset = i / 8;
|
||||
byte bit = (byte) (128 >> i % 8);
|
||||
bitmap[offset] |= bit;
|
||||
}
|
||||
}
|
||||
w.writeStructId(Types.REQUEST);
|
||||
w.writeUint7((byte) (bytes * 8 - length));
|
||||
w.writeBytes(bitmap);
|
||||
}
|
||||
|
||||
public void writeSubscriptionUpdate(SubscriptionUpdate s)
|
||||
throws IOException {
|
||||
w.writeStructId(Types.SUBSCRIPTION_UPDATE);
|
||||
w.writeMapStart();
|
||||
for(Entry<Group, Long> e : s.getSubscriptions().entrySet()) {
|
||||
writeGroup(w, e.getKey());
|
||||
w.writeInt64(e.getValue());
|
||||
}
|
||||
w.writeMapEnd();
|
||||
w.writeInt64(s.getTimestamp());
|
||||
}
|
||||
|
||||
private void writeGroup(Writer w, Group g) throws IOException {
|
||||
w.writeStructId(Types.GROUP);
|
||||
w.writeString(g.getName());
|
||||
byte[] publicKey = g.getPublicKey();
|
||||
if(publicKey == null) w.writeNull();
|
||||
else w.writeBytes(publicKey);
|
||||
}
|
||||
|
||||
public void writeTransportUpdate(TransportUpdate t) throws IOException {
|
||||
w.writeStructId(Types.TRANSPORT_UPDATE);
|
||||
w.writeListStart();
|
||||
for(Transport p : t.getTransports()) {
|
||||
w.writeStructId(Types.TRANSPORT);
|
||||
w.writeBytes(p.getId().getBytes());
|
||||
w.writeInt32(p.getIndex().getInt());
|
||||
w.writeMap(p);
|
||||
}
|
||||
w.writeListEnd();
|
||||
w.writeInt64(t.getTimestamp());
|
||||
}
|
||||
}
|
||||
25
components/net/sf/briar/protocol/RawBatchImpl.java
Normal file
25
components/net/sf/briar/protocol/RawBatchImpl.java
Normal file
@@ -0,0 +1,25 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
import net.sf.briar.api.protocol.RawBatch;
|
||||
|
||||
class RawBatchImpl implements RawBatch {
|
||||
|
||||
private final BatchId id;
|
||||
private final Collection<byte[]> messages;
|
||||
|
||||
RawBatchImpl(BatchId id, Collection<byte[]> messages) {
|
||||
this.id = id;
|
||||
this.messages = messages;
|
||||
}
|
||||
|
||||
public BatchId getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public Collection<byte[]> getMessages() {
|
||||
return messages;
|
||||
}
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.BitSet;
|
||||
|
||||
import net.sf.briar.api.protocol.Request;
|
||||
|
||||
interface RequestFactory {
|
||||
|
||||
Request createRequest(BitSet requested);
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.BitSet;
|
||||
|
||||
import net.sf.briar.api.protocol.Request;
|
||||
|
||||
class RequestFactoryImpl implements RequestFactory {
|
||||
|
||||
public Request createRequest(BitSet requested) {
|
||||
return new RequestImpl(requested);
|
||||
}
|
||||
}
|
||||
@@ -7,12 +7,18 @@ import net.sf.briar.api.protocol.Request;
|
||||
class RequestImpl implements Request {
|
||||
|
||||
private final BitSet requested;
|
||||
private final int length;
|
||||
|
||||
RequestImpl(BitSet requested) {
|
||||
RequestImpl(BitSet requested, int length) {
|
||||
this.requested = requested;
|
||||
this.length = length;
|
||||
}
|
||||
|
||||
public BitSet getBitmap() {
|
||||
return requested;
|
||||
}
|
||||
|
||||
public int getLength() {
|
||||
return length;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ package net.sf.briar.protocol;
|
||||
import java.io.IOException;
|
||||
import java.util.BitSet;
|
||||
|
||||
import net.sf.briar.api.FormatException;
|
||||
import net.sf.briar.api.protocol.PacketFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolConstants;
|
||||
import net.sf.briar.api.protocol.Request;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
@@ -13,10 +15,10 @@ import net.sf.briar.api.serial.Reader;
|
||||
|
||||
class RequestReader implements ObjectReader<Request> {
|
||||
|
||||
private final RequestFactory requestFactory;
|
||||
private final PacketFactory packetFactory;
|
||||
|
||||
RequestReader(RequestFactory requestFactory) {
|
||||
this.requestFactory = requestFactory;
|
||||
RequestReader(PacketFactory packetFactory) {
|
||||
this.packetFactory = packetFactory;
|
||||
}
|
||||
|
||||
public Request readObject(Reader r) throws IOException {
|
||||
@@ -26,16 +28,19 @@ class RequestReader implements ObjectReader<Request> {
|
||||
// Read the data
|
||||
r.addConsumer(counting);
|
||||
r.readStructId(Types.REQUEST);
|
||||
int padding = r.readUint7();
|
||||
if(padding > 7) throw new FormatException();
|
||||
byte[] bitmap = r.readBytes(ProtocolConstants.MAX_PACKET_LENGTH);
|
||||
r.removeConsumer(counting);
|
||||
// Convert the bitmap into a BitSet
|
||||
BitSet b = new BitSet(bitmap.length * 8);
|
||||
int length = bitmap.length * 8 - padding;
|
||||
BitSet b = new BitSet(length);
|
||||
for(int i = 0; i < bitmap.length; i++) {
|
||||
for(int j = 0; j < 8; j++) {
|
||||
for(int j = 0; j < 8 && i * 8 + j < length; j++) {
|
||||
byte bit = (byte) (128 >> j);
|
||||
if((bitmap[i] & bit) != 0) b.set(i * 8 + j);
|
||||
}
|
||||
}
|
||||
return requestFactory.createRequest(b);
|
||||
return packetFactory.createRequest(b, length);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import net.sf.briar.api.protocol.Group;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
|
||||
interface SubscriptionUpdateFactory {
|
||||
|
||||
SubscriptionUpdate createSubscriptions(Map<Group, Long> subs,
|
||||
long timestamp);
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import net.sf.briar.api.protocol.Group;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
|
||||
class SubscriptionUpdateFactoryImpl implements SubscriptionUpdateFactory {
|
||||
|
||||
public SubscriptionUpdate createSubscriptions(Map<Group, Long> subs,
|
||||
long timestamp) {
|
||||
return new SubscriptionUpdateImpl(subs, timestamp);
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import java.util.Map;
|
||||
|
||||
import net.sf.briar.api.FormatException;
|
||||
import net.sf.briar.api.protocol.Group;
|
||||
import net.sf.briar.api.protocol.PacketFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolConstants;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
@@ -16,12 +17,12 @@ import net.sf.briar.api.serial.Reader;
|
||||
class SubscriptionUpdateReader implements ObjectReader<SubscriptionUpdate> {
|
||||
|
||||
private final ObjectReader<Group> groupReader;
|
||||
private final SubscriptionUpdateFactory subscriptionFactory;
|
||||
private final PacketFactory packetFactory;
|
||||
|
||||
SubscriptionUpdateReader(ObjectReader<Group> groupReader,
|
||||
SubscriptionUpdateFactory subscriptionFactory) {
|
||||
PacketFactory packetFactory) {
|
||||
this.groupReader = groupReader;
|
||||
this.subscriptionFactory = subscriptionFactory;
|
||||
this.packetFactory = packetFactory;
|
||||
}
|
||||
|
||||
public SubscriptionUpdate readObject(Reader r) throws IOException {
|
||||
@@ -38,6 +39,6 @@ class SubscriptionUpdateReader implements ObjectReader<SubscriptionUpdate> {
|
||||
if(timestamp < 0L) throw new FormatException();
|
||||
r.removeConsumer(counting);
|
||||
// Build and return the subscription update
|
||||
return subscriptionFactory.createSubscriptions(subs, timestamp);
|
||||
return packetFactory.createSubscriptionUpdate(subs, timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import net.sf.briar.api.protocol.Transport;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
|
||||
interface TransportUpdateFactory {
|
||||
|
||||
TransportUpdate createTransportUpdate(Collection<Transport> transports,
|
||||
long timestamp);
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
package net.sf.briar.protocol;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import net.sf.briar.api.protocol.Transport;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
|
||||
class TransportUpdateFactoryImpl implements TransportUpdateFactory {
|
||||
|
||||
public TransportUpdate createTransportUpdate(
|
||||
Collection<Transport> transports, long timestamp) {
|
||||
return new TransportUpdateImpl(transports, timestamp);
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import net.sf.briar.api.FormatException;
|
||||
import net.sf.briar.api.protocol.PacketFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolConstants;
|
||||
import net.sf.briar.api.protocol.Transport;
|
||||
import net.sf.briar.api.protocol.TransportId;
|
||||
@@ -21,11 +22,11 @@ import net.sf.briar.api.serial.Reader;
|
||||
|
||||
class TransportUpdateReader implements ObjectReader<TransportUpdate> {
|
||||
|
||||
private final TransportUpdateFactory transportUpdateFactory;
|
||||
private final PacketFactory packetFactory;
|
||||
private final ObjectReader<Transport> transportReader;
|
||||
|
||||
TransportUpdateReader(TransportUpdateFactory transportFactory) {
|
||||
this.transportUpdateFactory = transportFactory;
|
||||
TransportUpdateReader(PacketFactory packetFactory) {
|
||||
this.packetFactory = packetFactory;
|
||||
transportReader = new TransportReader();
|
||||
}
|
||||
|
||||
@@ -51,8 +52,7 @@ class TransportUpdateReader implements ObjectReader<TransportUpdate> {
|
||||
if(!indices.add(t.getIndex())) throw new FormatException();
|
||||
}
|
||||
// Build and return the transport update
|
||||
return transportUpdateFactory.createTransportUpdate(transports,
|
||||
timestamp);
|
||||
return packetFactory.createTransportUpdate(transports, timestamp);
|
||||
}
|
||||
|
||||
private static class TransportReader implements ObjectReader<Transport> {
|
||||
|
||||
@@ -1,64 +0,0 @@
|
||||
package net.sf.briar.protocol.writers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
import net.sf.briar.api.protocol.ProtocolConstants;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
import net.sf.briar.api.protocol.writers.AckWriter;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.serial.Writer;
|
||||
import net.sf.briar.api.serial.WriterFactory;
|
||||
|
||||
class AckWriterImpl implements AckWriter {
|
||||
|
||||
private final OutputStream out;
|
||||
private final int headerLength, idLength, footerLength;
|
||||
private final Writer w;
|
||||
|
||||
private boolean started = false;
|
||||
private int capacity = ProtocolConstants.MAX_PACKET_LENGTH;
|
||||
|
||||
AckWriterImpl(OutputStream out, SerialComponent serial,
|
||||
WriterFactory writerFactory) {
|
||||
this.out = out;
|
||||
headerLength = serial.getSerialisedStructIdLength(Types.ACK)
|
||||
+ serial.getSerialisedListStartLength();
|
||||
idLength = serial.getSerialisedUniqueIdLength(Types.BATCH_ID);
|
||||
footerLength = serial.getSerialisedListEndLength();
|
||||
w = writerFactory.createWriter(out);
|
||||
}
|
||||
|
||||
public void setMaxPacketLength(int length) {
|
||||
if(started) throw new IllegalStateException();
|
||||
if(length < 0 || length > ProtocolConstants.MAX_PACKET_LENGTH)
|
||||
throw new IllegalArgumentException();
|
||||
capacity = length;
|
||||
}
|
||||
|
||||
public boolean writeBatchId(BatchId b) throws IOException {
|
||||
int overhead = started ? footerLength : headerLength + footerLength;
|
||||
if(capacity < idLength + overhead) return false;
|
||||
if(!started) start();
|
||||
w.writeStructId(Types.BATCH_ID);
|
||||
w.writeBytes(b.getBytes());
|
||||
capacity -= idLength;
|
||||
return true;
|
||||
}
|
||||
|
||||
public void finish() throws IOException {
|
||||
if(!started) start();
|
||||
w.writeListEnd();
|
||||
out.flush();
|
||||
capacity = ProtocolConstants.MAX_PACKET_LENGTH;
|
||||
started = false;
|
||||
}
|
||||
|
||||
private void start() throws IOException {
|
||||
w.writeStructId(Types.ACK);
|
||||
w.writeListStart();
|
||||
capacity -= headerLength;
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
@@ -1,78 +0,0 @@
|
||||
package net.sf.briar.protocol.writers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import net.sf.briar.api.crypto.MessageDigest;
|
||||
import net.sf.briar.api.protocol.BatchId;
|
||||
import net.sf.briar.api.protocol.ProtocolConstants;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
import net.sf.briar.api.protocol.writers.BatchWriter;
|
||||
import net.sf.briar.api.serial.DigestingConsumer;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.serial.Writer;
|
||||
import net.sf.briar.api.serial.WriterFactory;
|
||||
|
||||
class BatchWriterImpl implements BatchWriter {
|
||||
|
||||
private final OutputStream out;
|
||||
private final int headerLength, footerLength;
|
||||
private final Writer w;
|
||||
private final MessageDigest messageDigest;
|
||||
private final DigestingConsumer digestingConsumer;
|
||||
|
||||
private boolean started = false;
|
||||
private int capacity = ProtocolConstants.MAX_PACKET_LENGTH;
|
||||
private int remaining = capacity;
|
||||
|
||||
BatchWriterImpl(OutputStream out, SerialComponent serial,
|
||||
WriterFactory writerFactory, MessageDigest messageDigest) {
|
||||
this.out = out;
|
||||
headerLength = serial.getSerialisedStructIdLength(Types.BATCH)
|
||||
+ serial.getSerialisedListStartLength();
|
||||
footerLength = serial.getSerialisedListEndLength();
|
||||
w = writerFactory.createWriter(this.out);
|
||||
this.messageDigest = messageDigest;
|
||||
digestingConsumer = new DigestingConsumer(messageDigest);
|
||||
}
|
||||
|
||||
public int getCapacity() {
|
||||
return capacity - headerLength - footerLength;
|
||||
}
|
||||
|
||||
public void setMaxPacketLength(int length) {
|
||||
if(started) throw new IllegalStateException();
|
||||
if(length < 0 || length > ProtocolConstants.MAX_PACKET_LENGTH)
|
||||
throw new IllegalArgumentException();
|
||||
remaining = capacity = length;
|
||||
}
|
||||
|
||||
public boolean writeMessage(byte[] message) throws IOException {
|
||||
int overhead = started ? footerLength : headerLength + footerLength;
|
||||
if(remaining < message.length + overhead) return false;
|
||||
if(!started) start();
|
||||
// Bypass the writer and write the raw message directly
|
||||
out.write(message);
|
||||
remaining -= message.length;
|
||||
return true;
|
||||
}
|
||||
|
||||
public BatchId finish() throws IOException {
|
||||
if(!started) start();
|
||||
w.writeListEnd();
|
||||
w.removeConsumer(digestingConsumer);
|
||||
out.flush();
|
||||
remaining = capacity = ProtocolConstants.MAX_PACKET_LENGTH;
|
||||
started = false;
|
||||
return new BatchId(messageDigest.digest());
|
||||
}
|
||||
|
||||
private void start() throws IOException {
|
||||
messageDigest.reset();
|
||||
w.addConsumer(digestingConsumer);
|
||||
w.writeStructId(Types.BATCH);
|
||||
w.writeListStart();
|
||||
remaining -= headerLength;
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
@@ -1,64 +0,0 @@
|
||||
package net.sf.briar.protocol.writers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import net.sf.briar.api.protocol.MessageId;
|
||||
import net.sf.briar.api.protocol.ProtocolConstants;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
import net.sf.briar.api.protocol.writers.OfferWriter;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.serial.Writer;
|
||||
import net.sf.briar.api.serial.WriterFactory;
|
||||
|
||||
class OfferWriterImpl implements OfferWriter {
|
||||
|
||||
private final OutputStream out;
|
||||
private final int headerLength, idLength, footerLength;
|
||||
private final Writer w;
|
||||
|
||||
private boolean started = false;
|
||||
private int capacity = ProtocolConstants.MAX_PACKET_LENGTH;
|
||||
|
||||
OfferWriterImpl(OutputStream out, SerialComponent serial,
|
||||
WriterFactory writerFactory) {
|
||||
this.out = out;
|
||||
headerLength = serial.getSerialisedStructIdLength(Types.OFFER)
|
||||
+ serial.getSerialisedListStartLength();
|
||||
idLength = serial.getSerialisedUniqueIdLength(Types.MESSAGE_ID);
|
||||
footerLength = serial.getSerialisedListEndLength();
|
||||
w = writerFactory.createWriter(out);
|
||||
}
|
||||
|
||||
public void setMaxPacketLength(int length) {
|
||||
if(started) throw new IllegalStateException();
|
||||
if(length < 0 || length > ProtocolConstants.MAX_PACKET_LENGTH)
|
||||
throw new IllegalArgumentException();
|
||||
capacity = length;
|
||||
}
|
||||
|
||||
public boolean writeMessageId(MessageId m) throws IOException {
|
||||
int overhead = started ? footerLength : headerLength + footerLength;
|
||||
if(capacity < idLength + overhead) return false;
|
||||
if(!started) start();
|
||||
w.writeStructId(Types.MESSAGE_ID);
|
||||
w.writeBytes(m.getBytes());
|
||||
capacity -= idLength;
|
||||
return true;
|
||||
}
|
||||
|
||||
public void finish() throws IOException {
|
||||
if(!started) start();
|
||||
w.writeListEnd();
|
||||
out.flush();
|
||||
capacity = ProtocolConstants.MAX_PACKET_LENGTH;
|
||||
started = false;
|
||||
}
|
||||
|
||||
private void start() throws IOException {
|
||||
w.writeStructId(Types.OFFER);
|
||||
w.writeListStart();
|
||||
capacity -= headerLength;
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
@@ -1,57 +0,0 @@
|
||||
package net.sf.briar.protocol.writers;
|
||||
|
||||
import java.io.OutputStream;
|
||||
|
||||
import net.sf.briar.api.crypto.CryptoComponent;
|
||||
import net.sf.briar.api.crypto.MessageDigest;
|
||||
import net.sf.briar.api.protocol.writers.AckWriter;
|
||||
import net.sf.briar.api.protocol.writers.BatchWriter;
|
||||
import net.sf.briar.api.protocol.writers.OfferWriter;
|
||||
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.writers.RequestWriter;
|
||||
import net.sf.briar.api.protocol.writers.SubscriptionUpdateWriter;
|
||||
import net.sf.briar.api.protocol.writers.TransportUpdateWriter;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.serial.WriterFactory;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
class ProtocolWriterFactoryImpl implements ProtocolWriterFactory {
|
||||
|
||||
private final MessageDigest messageDigest;
|
||||
private final SerialComponent serial;
|
||||
private final WriterFactory writerFactory;
|
||||
|
||||
@Inject
|
||||
ProtocolWriterFactoryImpl(CryptoComponent crypto,
|
||||
SerialComponent serial, WriterFactory writerFactory) {
|
||||
messageDigest = crypto.getMessageDigest();
|
||||
this.serial = serial;
|
||||
this.writerFactory = writerFactory;
|
||||
}
|
||||
|
||||
public AckWriter createAckWriter(OutputStream out) {
|
||||
return new AckWriterImpl(out, serial, writerFactory);
|
||||
}
|
||||
|
||||
public BatchWriter createBatchWriter(OutputStream out) {
|
||||
return new BatchWriterImpl(out, serial, writerFactory, messageDigest);
|
||||
}
|
||||
|
||||
public OfferWriter createOfferWriter(OutputStream out) {
|
||||
return new OfferWriterImpl(out, serial, writerFactory);
|
||||
}
|
||||
|
||||
public RequestWriter createRequestWriter(OutputStream out) {
|
||||
return new RequestWriterImpl(out, writerFactory);
|
||||
}
|
||||
|
||||
public SubscriptionUpdateWriter createSubscriptionUpdateWriter(
|
||||
OutputStream out) {
|
||||
return new SubscriptionUpdateWriterImpl(out, writerFactory);
|
||||
}
|
||||
|
||||
public TransportUpdateWriter createTransportUpdateWriter(OutputStream out) {
|
||||
return new TransportUpdateWriterImpl(out, writerFactory);
|
||||
}
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package net.sf.briar.protocol.writers;
|
||||
|
||||
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
|
||||
public class ProtocolWritersModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class);
|
||||
}
|
||||
}
|
||||
@@ -1,39 +0,0 @@
|
||||
package net.sf.briar.protocol.writers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.BitSet;
|
||||
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
import net.sf.briar.api.protocol.writers.RequestWriter;
|
||||
import net.sf.briar.api.serial.Writer;
|
||||
import net.sf.briar.api.serial.WriterFactory;
|
||||
|
||||
class RequestWriterImpl implements RequestWriter {
|
||||
|
||||
private final OutputStream out;
|
||||
private final Writer w;
|
||||
|
||||
RequestWriterImpl(OutputStream out, WriterFactory writerFactory) {
|
||||
this.out = out;
|
||||
w = writerFactory.createWriter(out);
|
||||
}
|
||||
|
||||
public void writeRequest(BitSet b, int length)
|
||||
throws IOException {
|
||||
w.writeStructId(Types.REQUEST);
|
||||
// If the number of bits isn't a multiple of 8, round up to a byte
|
||||
int bytes = length % 8 == 0 ? length / 8 : length / 8 + 1;
|
||||
byte[] bitmap = new byte[bytes];
|
||||
// I'm kind of surprised BitSet doesn't have a method for this
|
||||
for(int i = 0; i < length; i++) {
|
||||
if(b.get(i)) {
|
||||
int offset = i / 8;
|
||||
byte bit = (byte) (128 >> i % 8);
|
||||
bitmap[offset] |= bit;
|
||||
}
|
||||
}
|
||||
w.writeBytes(bitmap);
|
||||
out.flush();
|
||||
}
|
||||
}
|
||||
@@ -1,45 +0,0 @@
|
||||
package net.sf.briar.protocol.writers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import net.sf.briar.api.protocol.Group;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
import net.sf.briar.api.protocol.writers.SubscriptionUpdateWriter;
|
||||
import net.sf.briar.api.serial.Writer;
|
||||
import net.sf.briar.api.serial.WriterFactory;
|
||||
|
||||
class SubscriptionUpdateWriterImpl implements SubscriptionUpdateWriter {
|
||||
|
||||
private final OutputStream out;
|
||||
private final Writer w;
|
||||
|
||||
SubscriptionUpdateWriterImpl(OutputStream out,
|
||||
WriterFactory writerFactory) {
|
||||
this.out = out;
|
||||
w = writerFactory.createWriter(out);
|
||||
}
|
||||
|
||||
public void writeSubscriptions(Map<Group, Long> subs, long timestamp)
|
||||
throws IOException {
|
||||
w.writeStructId(Types.SUBSCRIPTION_UPDATE);
|
||||
w.writeMapStart();
|
||||
for(Entry<Group, Long> e : subs.entrySet()) {
|
||||
writeGroup(w, e.getKey());
|
||||
w.writeInt64(e.getValue());
|
||||
}
|
||||
w.writeMapEnd();
|
||||
w.writeInt64(timestamp);
|
||||
out.flush();
|
||||
}
|
||||
|
||||
private void writeGroup(Writer w, Group g) throws IOException {
|
||||
w.writeStructId(Types.GROUP);
|
||||
w.writeString(g.getName());
|
||||
byte[] publicKey = g.getPublicKey();
|
||||
if(publicKey == null) w.writeNull();
|
||||
else w.writeBytes(publicKey);
|
||||
}
|
||||
}
|
||||
@@ -1,37 +0,0 @@
|
||||
package net.sf.briar.protocol.writers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Collection;
|
||||
|
||||
import net.sf.briar.api.protocol.Transport;
|
||||
import net.sf.briar.api.protocol.Types;
|
||||
import net.sf.briar.api.protocol.writers.TransportUpdateWriter;
|
||||
import net.sf.briar.api.serial.Writer;
|
||||
import net.sf.briar.api.serial.WriterFactory;
|
||||
|
||||
class TransportUpdateWriterImpl implements TransportUpdateWriter {
|
||||
|
||||
private final OutputStream out;
|
||||
private final Writer w;
|
||||
|
||||
TransportUpdateWriterImpl(OutputStream out, WriterFactory writerFactory) {
|
||||
this.out = out;
|
||||
w = writerFactory.createWriter(out);
|
||||
}
|
||||
|
||||
public void writeTransports(Collection<Transport> transports,
|
||||
long timestamp) throws IOException {
|
||||
w.writeStructId(Types.TRANSPORT_UPDATE);
|
||||
w.writeListStart();
|
||||
for(Transport p : transports) {
|
||||
w.writeStructId(Types.TRANSPORT);
|
||||
w.writeBytes(p.getId().getBytes());
|
||||
w.writeInt32(p.getIndex().getInt());
|
||||
w.writeMap(p);
|
||||
}
|
||||
w.writeListEnd();
|
||||
w.writeInt64(timestamp);
|
||||
out.flush();
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,11 @@ class SerialComponentImpl implements SerialComponent {
|
||||
return 1;
|
||||
}
|
||||
|
||||
public int getSerialisedStructIdLength(int id) {
|
||||
if(id < 0 || id > 255) throw new IllegalArgumentException();
|
||||
return id < 32 ? 1 : 2;
|
||||
}
|
||||
|
||||
public int getSerialisedUniqueIdLength(int id) {
|
||||
// Struct ID, BYTES tag, length spec, bytes
|
||||
return getSerialisedStructIdLength(id) + 1
|
||||
@@ -22,14 +27,9 @@ class SerialComponentImpl implements SerialComponent {
|
||||
}
|
||||
|
||||
private int getSerialisedLengthSpecLength(int length) {
|
||||
assert length >= 0;
|
||||
if(length < 0) throw new IllegalArgumentException();
|
||||
if(length < 128) return 1; // Uint7
|
||||
if(length < Short.MAX_VALUE) return 3; // Int16
|
||||
return 5; // Int32
|
||||
}
|
||||
|
||||
public int getSerialisedStructIdLength(int id) {
|
||||
assert id >= 0 && id <= 255;
|
||||
return id < 32 ? 1 : 2;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,8 @@ import java.util.concurrent.Executor;
|
||||
import net.sf.briar.api.ContactId;
|
||||
import net.sf.briar.api.db.DatabaseComponent;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.TransportIndex;
|
||||
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.transport.BatchConnectionFactory;
|
||||
import net.sf.briar.api.transport.BatchTransportReader;
|
||||
import net.sf.briar.api.transport.BatchTransportWriter;
|
||||
@@ -19,22 +19,22 @@ import com.google.inject.Inject;
|
||||
class BatchConnectionFactoryImpl implements BatchConnectionFactory {
|
||||
|
||||
private final Executor executor;
|
||||
private final DatabaseComponent db;
|
||||
private final ConnectionReaderFactory connReaderFactory;
|
||||
private final ConnectionWriterFactory connWriterFactory;
|
||||
private final DatabaseComponent db;
|
||||
private final ProtocolReaderFactory protoReaderFactory;
|
||||
private final ProtocolWriterFactory protoWriterFactory;
|
||||
|
||||
@Inject
|
||||
BatchConnectionFactoryImpl(Executor executor,
|
||||
BatchConnectionFactoryImpl(Executor executor, DatabaseComponent db,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory) {
|
||||
this.executor = executor;
|
||||
this.db = db;
|
||||
this.connReaderFactory = connReaderFactory;
|
||||
this.connWriterFactory = connWriterFactory;
|
||||
this.db = db;
|
||||
this.protoReaderFactory = protoReaderFactory;
|
||||
this.protoWriterFactory = protoWriterFactory;
|
||||
}
|
||||
@@ -42,7 +42,7 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
|
||||
public void createIncomingConnection(ConnectionContext ctx,
|
||||
BatchTransportReader r, byte[] tag) {
|
||||
final IncomingBatchConnection conn = new IncomingBatchConnection(
|
||||
executor, connReaderFactory, db, protoReaderFactory, ctx, r,
|
||||
executor, db, connReaderFactory, protoReaderFactory, ctx, r,
|
||||
tag);
|
||||
Runnable read = new Runnable() {
|
||||
public void run() {
|
||||
@@ -54,8 +54,8 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
|
||||
|
||||
public void createOutgoingConnection(ContactId c, TransportIndex i,
|
||||
BatchTransportWriter w) {
|
||||
final OutgoingBatchConnection conn = new OutgoingBatchConnection(
|
||||
connWriterFactory, db, protoWriterFactory, c, i, w);
|
||||
final OutgoingBatchConnection conn = new OutgoingBatchConnection(db,
|
||||
connWriterFactory, protoWriterFactory, c, i, w);
|
||||
Runnable write = new Runnable() {
|
||||
public void run() {
|
||||
conn.write();
|
||||
|
||||
@@ -39,8 +39,8 @@ class IncomingBatchConnection {
|
||||
private final Semaphore semaphore;
|
||||
|
||||
IncomingBatchConnection(Executor executor,
|
||||
ConnectionReaderFactory connFactory,
|
||||
DatabaseComponent db, ProtocolReaderFactory protoFactory,
|
||||
DatabaseComponent db,
|
||||
ConnectionReaderFactory connFactory, ProtocolReaderFactory protoFactory,
|
||||
ConnectionContext ctx, BatchTransportReader reader, byte[] tag) {
|
||||
this.executor = executor;
|
||||
this.connFactory = connFactory;
|
||||
|
||||
@@ -10,12 +10,13 @@ import java.util.logging.Logger;
|
||||
import net.sf.briar.api.ContactId;
|
||||
import net.sf.briar.api.db.DatabaseComponent;
|
||||
import net.sf.briar.api.db.DbException;
|
||||
import net.sf.briar.api.protocol.Ack;
|
||||
import net.sf.briar.api.protocol.ProtocolWriter;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.RawBatch;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.TransportIndex;
|
||||
import net.sf.briar.api.protocol.writers.AckWriter;
|
||||
import net.sf.briar.api.protocol.writers.BatchWriter;
|
||||
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.writers.SubscriptionUpdateWriter;
|
||||
import net.sf.briar.api.protocol.writers.TransportUpdateWriter;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
import net.sf.briar.api.transport.BatchTransportWriter;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionWriter;
|
||||
@@ -26,23 +27,23 @@ class OutgoingBatchConnection {
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(OutgoingBatchConnection.class.getName());
|
||||
|
||||
private final ConnectionWriterFactory connFactory;
|
||||
private final DatabaseComponent db;
|
||||
private final ConnectionWriterFactory connFactory;
|
||||
private final ProtocolWriterFactory protoFactory;
|
||||
private final ContactId contactId;
|
||||
private final TransportIndex transportIndex;
|
||||
private final BatchTransportWriter writer;
|
||||
private final BatchTransportWriter transport;
|
||||
|
||||
OutgoingBatchConnection(ConnectionWriterFactory connFactory,
|
||||
DatabaseComponent db, ProtocolWriterFactory protoFactory,
|
||||
ContactId contactId, TransportIndex transportIndex,
|
||||
BatchTransportWriter writer) {
|
||||
this.connFactory = connFactory;
|
||||
OutgoingBatchConnection(DatabaseComponent db,
|
||||
ConnectionWriterFactory connFactory,
|
||||
ProtocolWriterFactory protoFactory, ContactId contactId,
|
||||
TransportIndex transportIndex, BatchTransportWriter transport) {
|
||||
this.db = db;
|
||||
this.connFactory = connFactory;
|
||||
this.protoFactory = protoFactory;
|
||||
this.contactId = contactId;
|
||||
this.transportIndex = transportIndex;
|
||||
this.writer = writer;
|
||||
this.transport = transport;
|
||||
}
|
||||
|
||||
void write() {
|
||||
@@ -50,45 +51,52 @@ class OutgoingBatchConnection {
|
||||
ConnectionContext ctx = db.getConnectionContext(contactId,
|
||||
transportIndex);
|
||||
ConnectionWriter conn = connFactory.createConnectionWriter(
|
||||
writer.getOutputStream(), writer.getCapacity(),
|
||||
transport.getOutputStream(), transport.getCapacity(),
|
||||
ctx.getSecret());
|
||||
OutputStream out = conn.getOutputStream();
|
||||
ProtocolWriter proto = protoFactory.createProtocolWriter(out);
|
||||
// There should be enough space for a packet
|
||||
long capacity = conn.getRemainingCapacity();
|
||||
if(capacity < MAX_PACKET_LENGTH) throw new IOException();
|
||||
// Write a transport update
|
||||
TransportUpdateWriter t =
|
||||
protoFactory.createTransportUpdateWriter(out);
|
||||
db.generateTransportUpdate(contactId, t);
|
||||
TransportUpdate t = db.generateTransportUpdate(contactId);
|
||||
if(t != null) proto.writeTransportUpdate(t);
|
||||
// If there's space, write a subscription update
|
||||
capacity = conn.getRemainingCapacity();
|
||||
if(capacity >= MAX_PACKET_LENGTH) {
|
||||
SubscriptionUpdateWriter s =
|
||||
protoFactory.createSubscriptionUpdateWriter(out);
|
||||
db.generateSubscriptionUpdate(contactId, s);
|
||||
SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId);
|
||||
if(s != null) proto.writeSubscriptionUpdate(s);
|
||||
}
|
||||
// Write acks until you can't write acks no more
|
||||
AckWriter a = protoFactory.createAckWriter(out);
|
||||
do {
|
||||
capacity = conn.getRemainingCapacity();
|
||||
int maxBatches = proto.getMaxBatchesForAck(capacity);
|
||||
Ack a = db.generateAck(contactId, maxBatches);
|
||||
while(a != null) {
|
||||
proto.writeAck(a);
|
||||
capacity = conn.getRemainingCapacity();
|
||||
int max = (int) Math.min(MAX_PACKET_LENGTH, capacity);
|
||||
a.setMaxPacketLength(max);
|
||||
} while(db.generateAck(contactId, a));
|
||||
maxBatches = proto.getMaxBatchesForAck(capacity);
|
||||
a = db.generateAck(contactId, maxBatches);
|
||||
}
|
||||
// Write batches until you can't write batches no more
|
||||
BatchWriter b = protoFactory.createBatchWriter(out);
|
||||
do {
|
||||
capacity = conn.getRemainingCapacity();
|
||||
capacity = proto.getMessageCapacityForBatch(capacity);
|
||||
RawBatch b = db.generateBatch(contactId, (int) capacity);
|
||||
while(b != null) {
|
||||
proto.writeBatch(b);
|
||||
capacity = conn.getRemainingCapacity();
|
||||
int max = (int) Math.min(MAX_PACKET_LENGTH, capacity);
|
||||
b.setMaxPacketLength(max);
|
||||
} while(db.generateBatch(contactId, b));
|
||||
capacity = proto.getMessageCapacityForBatch(capacity);
|
||||
b = db.generateBatch(contactId, (int) capacity);
|
||||
}
|
||||
// Flush the output stream
|
||||
out.flush();
|
||||
} catch(DbException e) {
|
||||
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
|
||||
writer.dispose(false);
|
||||
transport.dispose(false);
|
||||
} catch(IOException e) {
|
||||
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
|
||||
writer.dispose(false);
|
||||
transport.dispose(false);
|
||||
}
|
||||
// Success
|
||||
writer.dispose(true);
|
||||
transport.dispose(true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,8 @@ import java.util.concurrent.Executor;
|
||||
import net.sf.briar.api.db.DatabaseComponent;
|
||||
import net.sf.briar.api.db.DbException;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionReader;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
@@ -19,14 +20,14 @@ class IncomingStreamConnection extends StreamConnection {
|
||||
private final ConnectionContext ctx;
|
||||
private final byte[] tag;
|
||||
|
||||
IncomingStreamConnection(Executor executor,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
|
||||
IncomingStreamConnection(Executor executor, DatabaseComponent db,
|
||||
SerialComponent serial, ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory,
|
||||
ConnectionContext ctx, StreamTransportConnection connection,
|
||||
byte[] tag) {
|
||||
super(executor, connReaderFactory, connWriterFactory, db,
|
||||
super(executor, db, serial, connReaderFactory, connWriterFactory,
|
||||
protoReaderFactory, protoWriterFactory, ctx.getContactId(),
|
||||
connection);
|
||||
this.ctx = ctx;
|
||||
|
||||
@@ -7,8 +7,9 @@ import net.sf.briar.api.ContactId;
|
||||
import net.sf.briar.api.db.DatabaseComponent;
|
||||
import net.sf.briar.api.db.DbException;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.TransportIndex;
|
||||
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionReader;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
@@ -22,14 +23,14 @@ class OutgoingStreamConnection extends StreamConnection {
|
||||
|
||||
private ConnectionContext ctx = null; // Locking: this
|
||||
|
||||
OutgoingStreamConnection(Executor executor,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
|
||||
OutgoingStreamConnection(Executor executor, DatabaseComponent db,
|
||||
SerialComponent serial, ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
|
||||
TransportIndex transportIndex,
|
||||
StreamTransportConnection connection) {
|
||||
super(executor, connReaderFactory, connWriterFactory, db,
|
||||
super(executor, db, serial, connReaderFactory, connWriterFactory,
|
||||
protoReaderFactory, protoWriterFactory, contactId, connection);
|
||||
this.transportIndex = transportIndex;
|
||||
}
|
||||
|
||||
@@ -31,17 +31,14 @@ import net.sf.briar.api.protocol.MessageId;
|
||||
import net.sf.briar.api.protocol.Offer;
|
||||
import net.sf.briar.api.protocol.ProtocolReader;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriter;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.RawBatch;
|
||||
import net.sf.briar.api.protocol.Request;
|
||||
import net.sf.briar.api.protocol.SubscriptionUpdate;
|
||||
import net.sf.briar.api.protocol.TransportUpdate;
|
||||
import net.sf.briar.api.protocol.UnverifiedBatch;
|
||||
import net.sf.briar.api.protocol.writers.AckWriter;
|
||||
import net.sf.briar.api.protocol.writers.BatchWriter;
|
||||
import net.sf.briar.api.protocol.writers.OfferWriter;
|
||||
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.writers.RequestWriter;
|
||||
import net.sf.briar.api.protocol.writers.SubscriptionUpdateWriter;
|
||||
import net.sf.briar.api.protocol.writers.TransportUpdateWriter;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.transport.ConnectionReader;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
import net.sf.briar.api.transport.ConnectionWriter;
|
||||
@@ -58,9 +55,10 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
Logger.getLogger(StreamConnection.class.getName());
|
||||
|
||||
protected final Executor executor;
|
||||
protected final DatabaseComponent db;
|
||||
protected final SerialComponent serial;
|
||||
protected final ConnectionReaderFactory connReaderFactory;
|
||||
protected final ConnectionWriterFactory connWriterFactory;
|
||||
protected final DatabaseComponent db;
|
||||
protected final ProtocolReaderFactory protoReaderFactory;
|
||||
protected final ProtocolWriterFactory protoWriterFactory;
|
||||
protected final ContactId contactId;
|
||||
@@ -73,16 +71,17 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
private LinkedList<MessageId> requested = null; // Locking: this
|
||||
private Offer incomingOffer = null; // Locking: this
|
||||
|
||||
StreamConnection(Executor executor,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
|
||||
StreamConnection(Executor executor, DatabaseComponent db,
|
||||
SerialComponent serial, ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
|
||||
StreamTransportConnection connection) {
|
||||
this.executor = executor;
|
||||
this.db = db;
|
||||
this.serial = serial;
|
||||
this.connReaderFactory = connReaderFactory;
|
||||
this.connWriterFactory = connWriterFactory;
|
||||
this.db = db;
|
||||
this.protoReaderFactory = protoReaderFactory;
|
||||
this.protoWriterFactory = protoWriterFactory;
|
||||
this.contactId = contactId;
|
||||
@@ -267,20 +266,11 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
void write() {
|
||||
try {
|
||||
OutputStream out = createConnectionWriter().getOutputStream();
|
||||
// Create the packet writers
|
||||
AckWriter ackWriter = protoWriterFactory.createAckWriter(out);
|
||||
BatchWriter batchWriter = protoWriterFactory.createBatchWriter(out);
|
||||
OfferWriter offerWriter = protoWriterFactory.createOfferWriter(out);
|
||||
RequestWriter requestWriter =
|
||||
protoWriterFactory.createRequestWriter(out);
|
||||
SubscriptionUpdateWriter subscriptionUpdateWriter =
|
||||
protoWriterFactory.createSubscriptionUpdateWriter(out);
|
||||
TransportUpdateWriter transportUpdateWriter =
|
||||
protoWriterFactory.createTransportUpdateWriter(out);
|
||||
ProtocolWriter proto = protoWriterFactory.createProtocolWriter(out);
|
||||
// Send the initial packets: transports, subs, any waiting acks
|
||||
sendTransportUpdate(transportUpdateWriter);
|
||||
sendSubscriptionUpdate(subscriptionUpdateWriter);
|
||||
sendAcks(ackWriter);
|
||||
sendTransportUpdate(proto);
|
||||
sendSubscriptionUpdate(proto);
|
||||
sendAcks(proto);
|
||||
State state = State.SEND_OFFER;
|
||||
// Main loop
|
||||
while(true) {
|
||||
@@ -289,7 +279,7 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
|
||||
case SEND_OFFER:
|
||||
// Try to send an offer
|
||||
if(sendOffer(offerWriter)) state = State.AWAIT_REQUEST;
|
||||
if(sendOffer(proto)) state = State.AWAIT_REQUEST;
|
||||
else state = State.IDLE;
|
||||
break;
|
||||
|
||||
@@ -312,16 +302,16 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
return;
|
||||
}
|
||||
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
|
||||
sendTransportUpdate(transportUpdateWriter);
|
||||
sendTransportUpdate(proto);
|
||||
}
|
||||
if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
|
||||
sendSubscriptionUpdate(subscriptionUpdateWriter);
|
||||
sendSubscriptionUpdate(proto);
|
||||
}
|
||||
if((flags & Flags.BATCH_RECEIVED) != 0) {
|
||||
sendAcks(ackWriter);
|
||||
sendAcks(proto);
|
||||
}
|
||||
if((flags & Flags.OFFER_RECEIVED) != 0) {
|
||||
sendRequest(requestWriter);
|
||||
sendRequest(proto);
|
||||
}
|
||||
if((flags & Flags.REQUEST_RECEIVED) != 0) {
|
||||
// Should only be received in state AWAIT_REQUEST
|
||||
@@ -351,16 +341,16 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
return;
|
||||
}
|
||||
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
|
||||
sendTransportUpdate(transportUpdateWriter);
|
||||
sendTransportUpdate(proto);
|
||||
}
|
||||
if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
|
||||
sendSubscriptionUpdate(subscriptionUpdateWriter);
|
||||
sendSubscriptionUpdate(proto);
|
||||
}
|
||||
if((flags & Flags.BATCH_RECEIVED) != 0) {
|
||||
sendAcks(ackWriter);
|
||||
sendAcks(proto);
|
||||
}
|
||||
if((flags & Flags.OFFER_RECEIVED) != 0) {
|
||||
sendRequest(requestWriter);
|
||||
sendRequest(proto);
|
||||
}
|
||||
if((flags & Flags.REQUEST_RECEIVED) != 0) {
|
||||
state = State.SEND_BATCHES;
|
||||
@@ -382,16 +372,16 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
return;
|
||||
}
|
||||
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
|
||||
sendTransportUpdate(transportUpdateWriter);
|
||||
sendTransportUpdate(proto);
|
||||
}
|
||||
if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
|
||||
sendSubscriptionUpdate(subscriptionUpdateWriter);
|
||||
sendSubscriptionUpdate(proto);
|
||||
}
|
||||
if((flags & Flags.BATCH_RECEIVED) != 0) {
|
||||
sendAcks(ackWriter);
|
||||
sendAcks(proto);
|
||||
}
|
||||
if((flags & Flags.OFFER_RECEIVED) != 0) {
|
||||
sendRequest(requestWriter);
|
||||
sendRequest(proto);
|
||||
}
|
||||
if((flags & Flags.REQUEST_RECEIVED) != 0) {
|
||||
// Should only be received in state AWAIT_REQUEST
|
||||
@@ -401,7 +391,7 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
// Ignored in this state
|
||||
}
|
||||
// Try to send a batch
|
||||
if(!sendBatch(batchWriter)) state = State.SEND_OFFER;
|
||||
if(!sendBatch(proto)) state = State.SEND_OFFER;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -416,11 +406,18 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
connection.dispose(true);
|
||||
}
|
||||
|
||||
private void sendAcks(AckWriter a) throws DbException, IOException {
|
||||
while(db.generateAck(contactId, a));
|
||||
private void sendAcks(ProtocolWriter proto)
|
||||
throws DbException, IOException {
|
||||
int maxBatches = proto.getMaxBatchesForAck(Long.MAX_VALUE);
|
||||
Ack a = db.generateAck(contactId, maxBatches);
|
||||
while(a != null) {
|
||||
proto.writeAck(a);
|
||||
a = db.generateAck(contactId, maxBatches);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean sendBatch(BatchWriter b) throws DbException, IOException {
|
||||
private boolean sendBatch(ProtocolWriter proto)
|
||||
throws DbException, IOException {
|
||||
Collection<MessageId> req;
|
||||
// Retrieve the requested message IDs
|
||||
synchronized(this) {
|
||||
@@ -429,31 +426,40 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
req = requested;
|
||||
}
|
||||
// Try to generate a batch, updating the collection of message IDs
|
||||
boolean anyAdded = db.generateBatch(contactId, b, req);
|
||||
// If no more batches can be generated, discard the remaining IDs
|
||||
if(!anyAdded) {
|
||||
int capacity = proto.getMessageCapacityForBatch(Long.MAX_VALUE);
|
||||
RawBatch b = db.generateBatch(contactId, capacity, req);
|
||||
if(b == null) {
|
||||
// No more batches can be generated - discard the remaining IDs
|
||||
synchronized(this) {
|
||||
assert offered == null;
|
||||
assert requested == req;
|
||||
requested = null;
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
proto.writeBatch(b);
|
||||
return true;
|
||||
}
|
||||
return anyAdded;
|
||||
}
|
||||
|
||||
private boolean sendOffer(OfferWriter o) throws DbException, IOException {
|
||||
private boolean sendOffer(ProtocolWriter proto)
|
||||
throws DbException, IOException {
|
||||
// Generate an offer
|
||||
Collection<MessageId> off = db.generateOffer(contactId, o);
|
||||
int maxMessages = proto.getMaxMessagesForOffer(Long.MAX_VALUE);
|
||||
Offer o = db.generateOffer(contactId, maxMessages);
|
||||
if(o == null) return false;
|
||||
proto.writeOffer(o);
|
||||
// Store the offered message IDs
|
||||
synchronized(this) {
|
||||
assert offered == null;
|
||||
assert requested == null;
|
||||
offered = off;
|
||||
offered = o.getMessageIds();
|
||||
}
|
||||
return !off.isEmpty();
|
||||
return true;
|
||||
}
|
||||
|
||||
private void sendRequest(RequestWriter r) throws DbException, IOException {
|
||||
private void sendRequest(ProtocolWriter proto)
|
||||
throws DbException, IOException {
|
||||
Offer o;
|
||||
// Retrieve the incoming offer
|
||||
synchronized(this) {
|
||||
@@ -462,16 +468,19 @@ abstract class StreamConnection implements DatabaseListener {
|
||||
incomingOffer = null;
|
||||
}
|
||||
// Process the offer and generate a request
|
||||
db.receiveOffer(contactId, o, r);
|
||||
Request r = db.receiveOffer(contactId, o);
|
||||
proto.writeRequest(r);
|
||||
}
|
||||
|
||||
private void sendTransportUpdate(TransportUpdateWriter t)
|
||||
private void sendTransportUpdate(ProtocolWriter proto)
|
||||
throws DbException, IOException {
|
||||
db.generateTransportUpdate(contactId, t);
|
||||
TransportUpdate t = db.generateTransportUpdate(contactId);
|
||||
if(t != null) proto.writeTransportUpdate(t);
|
||||
}
|
||||
|
||||
private void sendSubscriptionUpdate(SubscriptionUpdateWriter s)
|
||||
private void sendSubscriptionUpdate(ProtocolWriter proto)
|
||||
throws DbException, IOException {
|
||||
db.generateSubscriptionUpdate(contactId, s);
|
||||
SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId);
|
||||
if(s != null) proto.writeSubscriptionUpdate(s);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,9 @@ import java.util.concurrent.Executor;
|
||||
import net.sf.briar.api.ContactId;
|
||||
import net.sf.briar.api.db.DatabaseComponent;
|
||||
import net.sf.briar.api.protocol.ProtocolReaderFactory;
|
||||
import net.sf.briar.api.protocol.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.protocol.TransportIndex;
|
||||
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
|
||||
import net.sf.briar.api.serial.SerialComponent;
|
||||
import net.sf.briar.api.transport.ConnectionContext;
|
||||
import net.sf.briar.api.transport.ConnectionReaderFactory;
|
||||
import net.sf.briar.api.transport.ConnectionWriterFactory;
|
||||
@@ -18,31 +19,33 @@ import com.google.inject.Inject;
|
||||
class StreamConnectionFactoryImpl implements StreamConnectionFactory {
|
||||
|
||||
private final Executor executor;
|
||||
private final DatabaseComponent db;
|
||||
private final SerialComponent serial;
|
||||
private final ConnectionReaderFactory connReaderFactory;
|
||||
private final ConnectionWriterFactory connWriterFactory;
|
||||
private final DatabaseComponent db;
|
||||
private final ProtocolReaderFactory protoReaderFactory;
|
||||
private final ProtocolWriterFactory protoWriterFactory;
|
||||
|
||||
@Inject
|
||||
StreamConnectionFactoryImpl(Executor executor,
|
||||
ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
|
||||
StreamConnectionFactoryImpl(Executor executor, DatabaseComponent db,
|
||||
SerialComponent serial, ConnectionReaderFactory connReaderFactory,
|
||||
ConnectionWriterFactory connWriterFactory,
|
||||
ProtocolReaderFactory protoReaderFactory,
|
||||
ProtocolWriterFactory protoWriterFactory) {
|
||||
this.executor = executor;
|
||||
this.db = db;
|
||||
this.serial = serial;
|
||||
this.connReaderFactory = connReaderFactory;
|
||||
this.connWriterFactory = connWriterFactory;
|
||||
this.db = db;
|
||||
this.protoReaderFactory = protoReaderFactory;
|
||||
this.protoWriterFactory = protoWriterFactory;
|
||||
}
|
||||
|
||||
public void createIncomingConnection(ConnectionContext ctx,
|
||||
StreamTransportConnection s, byte[] tag) {
|
||||
final StreamConnection conn = new IncomingStreamConnection(executor,
|
||||
connReaderFactory, connWriterFactory, db, protoReaderFactory,
|
||||
protoWriterFactory, ctx, s, tag);
|
||||
final StreamConnection conn = new IncomingStreamConnection(executor, db,
|
||||
serial, connReaderFactory, connWriterFactory,
|
||||
protoReaderFactory, protoWriterFactory, ctx, s, tag);
|
||||
Runnable write = new Runnable() {
|
||||
public void run() {
|
||||
conn.write();
|
||||
@@ -59,9 +62,9 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
|
||||
|
||||
public void createOutgoingConnection(ContactId c, TransportIndex i,
|
||||
StreamTransportConnection s) {
|
||||
final StreamConnection conn = new OutgoingStreamConnection(executor,
|
||||
connReaderFactory, connWriterFactory, db, protoReaderFactory,
|
||||
protoWriterFactory, c, i, s);
|
||||
final StreamConnection conn = new OutgoingStreamConnection(executor, db,
|
||||
serial, connReaderFactory, connWriterFactory,
|
||||
protoReaderFactory, protoWriterFactory, c, i, s);
|
||||
Runnable write = new Runnable() {
|
||||
public void run() {
|
||||
conn.write();
|
||||
|
||||
Reference in New Issue
Block a user