Removed batches from BMP. Messages are now sent and acked individually.

This commit is contained in:
akwizgran
2013-01-16 22:56:03 +00:00
parent 13cad40004
commit 50ad1f486e
55 changed files with 574 additions and 1666 deletions

View File

@@ -11,7 +11,6 @@ import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageHeader;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
@@ -71,13 +70,6 @@ interface Database<T> {
*/
void commitTransaction(T txn) throws DbException;
/**
* Records a received batch as needing to be acknowledged.
* <p>
* Locking: contact read, messageStatus write.
*/
void addBatchToAck(T txn, ContactId c, BatchId b) throws DbException;
/**
* Adds a new contact to the database and returns an ID for the contact.
* <p>
@@ -101,12 +93,19 @@ interface Database<T> {
boolean addGroupMessage(T txn, Message m) throws DbException;
/**
* Records a sent batch as needing to be acknowledged.
* Records a received message as needing to be acknowledged.
* <p>
* Locking: contact read, messageStatus write.
*/
void addMessageToAck(T txn, ContactId c, MessageId m) throws DbException;
/**
* Records a collection of sent messages as needing to be acknowledged.
* <p>
* Locking: contact read, message read, messageStatus write.
*/
void addOutstandingBatch(T txn, ContactId c, BatchId b,
Collection<MessageId> sent) throws DbException;
void addOutstandingMessages(T txn, ContactId c, Collection<MessageId> sent)
throws DbException;
/**
* Returns false if the given message is already in the database. Otherwise
@@ -196,15 +195,6 @@ interface Database<T> {
boolean containsVisibleSubscription(T txn, GroupId g, ContactId c,
long time) throws DbException;
/**
* Returns the IDs of any batches received from the given contact that need
* to be acknowledged.
* <p>
* Locking: contact read, messageStatus read.
*/
Collection<BatchId> getBatchesToAck(T txn, ContactId c, int maxBatches)
throws DbException;
/**
* Returns the configuration for the given transport.
* <p>
@@ -267,12 +257,13 @@ interface Database<T> {
Collection<Transport> getLocalTransports(T txn) throws DbException;
/**
* Returns the IDs of any batches sent to the given contact that should now
* be considered lost.
* Returns the IDs of any messages sent to the given contact that should
* now be considered lost.
* <p>
* Locking: contact read, message read, messageStatus read.
*/
Collection<BatchId> getLostBatches(T txn, ContactId c) throws DbException;
Collection<MessageId> getLostMessages(T txn, ContactId c)
throws DbException;
/**
* Returns the message identified by the given ID, in serialised form.
@@ -315,6 +306,15 @@ interface Database<T> {
Collection<MessageId> getMessagesByAuthor(T txn, AuthorId a)
throws DbException;
/**
* Returns the IDs of any messages received from the given contact that
* need to be acknowledged.
* <p>
* Locking: contact read, messageStatus read.
*/
Collection<MessageId> getMessagesToAck(T txn, ContactId c, int maxMessages)
throws DbException;
/**
* Returns the number of children of the message identified by the given
* ID that are present in the database and have sendability scores greater
@@ -380,12 +380,13 @@ interface Database<T> {
/**
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, with a total size less than or equal to the given size.
* given contact, with a total length less than or equal to the given
* length.
* <p>
* Locking: contact read, message read, messageStatus read,
* subscription read.
*/
Collection<MessageId> getSendableMessages(T txn, ContactId c, int capacity)
Collection<MessageId> getSendableMessages(T txn, ContactId c, int maxLength)
throws DbException;
/**
@@ -493,21 +494,22 @@ interface Database<T> {
throws DbException;
/**
* Removes an outstanding batch that has been acknowledged. Any messages in
* the batch that are still considered outstanding (Status.SENT) with
* Removes outstanding messages that have been acknowledged. Any of the
* messages that are still considered outstanding (Status.SENT) with
* respect to the given contact are now considered seen (Status.SEEN).
* <p>
* Locking: contact read, message read, messageStatus write.
*/
void removeAckedBatch(T txn, ContactId c, BatchId b) throws DbException;
void removeAckedMessages(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
/**
* Marks the given batches received from the given contact as having been
* Marks the given messages received from the given contact as having been
* acknowledged.
* <p>
* Locking: contact read, messageStatus write.
*/
void removeBatchesToAck(T txn, ContactId c, Collection<BatchId> sent)
void removeMessagesToAck(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
/**
@@ -519,13 +521,14 @@ interface Database<T> {
void removeContact(T txn, ContactId c) throws DbException;
/**
* Removes an outstanding batch that has been lost. Any messages in the
* batch that are still considered outstanding (Status.SENT) with respect
* to the given contact are now considered unsent (Status.NEW).
* Removes outstanding messages that have been lost. Any messages that are
* still considered outstanding (Status.SENT) with respect to the given
* contact are now considered unsent (Status.NEW).
* <p>
* Locking: contact read, message read, messageStatus write.
*/
void removeLostBatch(T txn, ContactId c, BatchId b) throws DbException;
void removeLostMessages(T txn, ContactId c, Collection<MessageId> lost)
throws DbException;
/**
* Removes a message (and all associated state) from the database.

View File

@@ -32,28 +32,25 @@ import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageHeader;
import net.sf.briar.api.db.NoSuchContactException;
import net.sf.briar.api.db.NoSuchContactTransportException;
import net.sf.briar.api.db.event.BatchReceivedEvent;
import net.sf.briar.api.db.event.ContactAddedEvent;
import net.sf.briar.api.db.event.ContactRemovedEvent;
import net.sf.briar.api.db.event.DatabaseEvent;
import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent;
import net.sf.briar.api.db.event.MessagesAddedEvent;
import net.sf.briar.api.db.event.MessageAddedEvent;
import net.sf.briar.api.db.event.MessageReceivedEvent;
import net.sf.briar.api.db.event.RatingChangedEvent;
import net.sf.briar.api.db.event.RemoteTransportsUpdatedEvent;
import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent;
import net.sf.briar.api.lifecycle.ShutdownManager;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.RawBatch;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.Transport;
@@ -270,7 +267,7 @@ DatabaseCleaner.Callback {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
if(added) callListeners(new MessagesAddedEvent());
if(added) callListeners(new MessageAddedEvent());
}
/**
@@ -388,7 +385,7 @@ DatabaseCleaner.Callback {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
if(added) callListeners(new MessagesAddedEvent());
if(added) callListeners(new MessageAddedEvent());
}
public void addSecrets(Collection<TemporarySecret> secrets)
@@ -444,8 +441,8 @@ DatabaseCleaner.Callback {
return true;
}
public Ack generateAck(ContactId c, int maxBatches) throws DbException {
Collection<BatchId> acked;
public Ack generateAck(ContactId c, int maxMessages) throws DbException {
Collection<MessageId> acked;
contactLock.readLock().lock();
try {
messageStatusLock.readLock().lock();
@@ -454,7 +451,7 @@ DatabaseCleaner.Callback {
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
acked = db.getBatchesToAck(txn, c, maxBatches);
acked = db.getMessagesToAck(txn, c, maxMessages);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -469,7 +466,7 @@ DatabaseCleaner.Callback {
try {
T txn = db.startTransaction();
try {
db.removeBatchesToAck(txn, c, acked);
db.removeMessagesToAck(txn, c, acked);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -484,11 +481,10 @@ DatabaseCleaner.Callback {
return packetFactory.createAck(acked);
}
public RawBatch generateBatch(ContactId c, int capacity)
public Collection<byte[]> generateBatch(ContactId c, int maxLength)
throws DbException {
Collection<MessageId> ids;
List<byte[]> messages = new ArrayList<byte[]>();
RawBatch b;
// Get some sendable messages from the database
contactLock.readLock().lock();
try {
@@ -502,7 +498,7 @@ DatabaseCleaner.Callback {
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
ids = db.getSendableMessages(txn, c, capacity);
ids = db.getSendableMessages(txn, c, maxLength);
for(MessageId m : ids) {
messages.add(db.getMessage(txn, m));
}
@@ -518,13 +514,11 @@ DatabaseCleaner.Callback {
messageStatusLock.readLock().unlock();
}
if(messages.isEmpty()) return null;
messages = Collections.unmodifiableList(messages);
b = packetFactory.createBatch(messages);
messageStatusLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
db.addOutstandingBatch(txn, c, b.getId(), ids);
db.addOutstandingMessages(txn, c, ids);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -539,14 +533,13 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
return b;
return Collections.unmodifiableList(messages);
}
public RawBatch generateBatch(ContactId c, int capacity,
public Collection<byte[]> generateBatch(ContactId c, int maxLength,
Collection<MessageId> requested) throws DbException {
Collection<MessageId> ids = new ArrayList<MessageId>();
List<byte[]> messages = new ArrayList<byte[]>();
RawBatch b;
// Get some sendable messages from the database
contactLock.readLock().lock();
try {
@@ -565,10 +558,10 @@ DatabaseCleaner.Callback {
MessageId m = it.next();
byte[] raw = db.getMessageIfSendable(txn, c, m);
if(raw != null) {
if(raw.length > capacity) break;
if(raw.length > maxLength) break;
messages.add(raw);
ids.add(m);
capacity -= raw.length;
maxLength -= raw.length;
}
it.remove();
}
@@ -584,13 +577,11 @@ DatabaseCleaner.Callback {
messageStatusLock.readLock().unlock();
}
if(messages.isEmpty()) return null;
messages = Collections.unmodifiableList(messages);
b = packetFactory.createBatch(messages);
messageStatusLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
db.addOutstandingBatch(txn, c, b.getId(), ids);
db.addOutstandingMessages(txn, c, ids);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -605,7 +596,7 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
return b;
return Collections.unmodifiableList(messages);
}
public Offer generateOffer(ContactId c, int maxMessages)
@@ -1057,12 +1048,12 @@ DatabaseCleaner.Callback {
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<BatchId> acks = a.getBatchIds();
// Mark all messages in acked batches as seen
for(BatchId b : acks) db.removeAckedBatch(txn, c, b);
// Find any lost batches that need to be retransmitted
Collection<BatchId> lost = db.getLostBatches(txn, c);
for(BatchId b : lost) db.removeLostBatch(txn, c, b);
// Mark all acked messages as seen
db.removeAckedMessages(txn, c, a.getMessageIds());
// Find any lost messages that need to be retransmitted
// FIXME: Merge these methods
Collection<MessageId> lost = db.getLostMessages(txn, c);
if(!lost.isEmpty()) db.removeLostMessages(txn, c, lost);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1079,8 +1070,8 @@ DatabaseCleaner.Callback {
}
}
public void receiveBatch(ContactId c, Batch b) throws DbException {
boolean anyAdded = false;
public void receiveMessage(ContactId c, Message m) throws DbException {
boolean added = false;
contactLock.readLock().lock();
try {
messageLock.writeLock().lock();
@@ -1093,8 +1084,8 @@ DatabaseCleaner.Callback {
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
anyAdded = storeMessages(txn, c, b.getMessages());
db.addBatchToAck(txn, c, b.getId());
added = storeMessage(txn, c, m);
db.addMessageToAck(txn, c, m.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1113,32 +1104,24 @@ DatabaseCleaner.Callback {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
callListeners(new BatchReceivedEvent());
if(anyAdded) callListeners(new MessagesAddedEvent());
callListeners(new MessageReceivedEvent());
if(added) callListeners(new MessageAddedEvent());
}
/**
* Attempts to store a collection of messages received from the given
* contact, and returns true if any were stored.
* Attempts to store a message received from the given contact, and returns
* true if it was stored.
* <p>
* Locking: contact read, message write, messageStatus write,
* subscription read.
*/
private boolean storeMessages(T txn, ContactId c,
Collection<Message> messages) throws DbException {
boolean anyStored = false;
for(Message m : messages) {
GroupId g = m.getGroup();
if(g == null) {
if(storePrivateMessage(txn, m, c, true)) anyStored = true;
} else {
long timestamp = m.getTimestamp();
if(db.containsVisibleSubscription(txn, g, c, timestamp)) {
if(storeGroupMessage(txn, m, c)) anyStored = true;
}
}
}
return anyStored;
private boolean storeMessage(T txn, ContactId c, Message m)
throws DbException {
GroupId g = m.getGroup();
if(g == null) return storePrivateMessage(txn, m, c, true);
if(!db.containsVisibleSubscription(txn, g, c, m.getTimestamp()))
return false;
return storeGroupMessage(txn, m, c);
}
public Request receiveOffer(ContactId c, Offer o) throws DbException {

View File

@@ -42,12 +42,6 @@ interface DatabaseConstants {
*/
long EXPIRY_MODULUS = 60L * 60L * 1000L; // 1 hour
/**
* A batch sent to a contact is considered lost when this many more
* recently sent batches have been acknowledged.
*/
int RETRANSMIT_THRESHOLD = 5;
/**
* The time in milliseconds after which a subscription or transport update
* should be sent to a contact even if no changes have occurred.

View File

@@ -3,7 +3,6 @@ package net.sf.briar.db;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static net.sf.briar.db.DatabaseConstants.EXPIRY_MODULUS;
import static net.sf.briar.db.DatabaseConstants.RETRANSMIT_THRESHOLD;
import java.io.File;
import java.io.FileNotFoundException;
@@ -33,7 +32,6 @@ import net.sf.briar.api.db.DbClosedException;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.MessageHeader;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
@@ -112,11 +110,11 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String INDEX_VISIBILITIES_BY_NEXT =
"CREATE INDEX visibilitiesByNext on visibilities (nextId)";
private static final String CREATE_BATCHES_TO_ACK =
"CREATE TABLE batchesToAck"
+ " (batchId HASH NOT NULL,"
private static final String CREATE_MESSAGES_TO_ACK =
"CREATE TABLE messagesToAck"
+ " (messageId HASH NOT NULL,"
+ " contactId INT NOT NULL,"
+ " PRIMARY KEY (batchId, contactId),"
+ " PRIMARY KEY (messageId, contactId),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
@@ -131,32 +129,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_OUTSTANDING_BATCHES =
"CREATE TABLE outstandingBatches"
+ " (batchId HASH NOT NULL,"
+ " contactId INT NOT NULL,"
+ " timestamp BIGINT NOT NULL,"
+ " passover INT NOT NULL,"
+ " PRIMARY KEY (batchId, contactId),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_OUTSTANDING_MESSAGES =
"CREATE TABLE outstandingMessages"
+ " (batchId HASH NOT NULL,"
+ " contactId INT NOT NULL,"
+ " messageId HASH NOT NULL,"
+ " PRIMARY KEY (batchId, contactId, messageId),"
+ " FOREIGN KEY (batchId, contactId)"
+ " REFERENCES outstandingBatches (batchId, contactId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (messageId) REFERENCES messages (messageId)"
+ " ON DELETE CASCADE)";
private static final String INDEX_OUTSTANDING_MESSAGES_BY_BATCH =
"CREATE INDEX outstandingMessagesByBatch"
+ " ON outstandingMessages (batchId)";
private static final String CREATE_RATINGS =
"CREATE TABLE ratings"
+ " (authorId HASH NOT NULL,"
@@ -322,11 +294,8 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(insertTypeNames(CREATE_VISIBILITIES));
s.executeUpdate(INDEX_VISIBILITIES_BY_GROUP);
s.executeUpdate(INDEX_VISIBILITIES_BY_NEXT);
s.executeUpdate(insertTypeNames(CREATE_BATCHES_TO_ACK));
s.executeUpdate(insertTypeNames(CREATE_MESSAGES_TO_ACK));
s.executeUpdate(insertTypeNames(CREATE_CONTACT_SUBSCRIPTIONS));
s.executeUpdate(insertTypeNames(CREATE_OUTSTANDING_BATCHES));
s.executeUpdate(insertTypeNames(CREATE_OUTSTANDING_MESSAGES));
s.executeUpdate(INDEX_OUTSTANDING_MESSAGES_BY_BATCH);
s.executeUpdate(insertTypeNames(CREATE_RATINGS));
s.executeUpdate(insertTypeNames(CREATE_STATUSES));
s.executeUpdate(INDEX_STATUSES_BY_MESSAGE);
@@ -452,37 +421,6 @@ abstract class JdbcDatabase implements Database<Connection> {
if(interrupted) Thread.currentThread().interrupt();
}
public void addBatchToAck(Connection txn, ContactId c, BatchId b)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT NULL FROM batchesToAck"
+ " WHERE batchId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, b.getBytes());
ps.setInt(2, c.getInt());
rs = ps.executeQuery();
boolean found = rs.next();
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
if(found) return;
sql = "INSERT INTO batchesToAck (batchId, contactId)"
+ " VALUES (?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, b.getBytes());
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public ContactId addContact(Connection txn) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -596,42 +534,30 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void addOutstandingBatch(Connection txn, ContactId c, BatchId b,
public void addMessageToAck(Connection txn, ContactId c, MessageId m)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO messagesToAck (messageId, contactId)"
+ " VALUES (?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void addOutstandingMessages(Connection txn, ContactId c,
Collection<MessageId> sent) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Create an outstanding batch row
String sql = "INSERT INTO outstandingBatches"
+ " (batchId, contactId, timestamp, passover)"
+ " VALUES (?, ?, ?, ZERO())";
ps = txn.prepareStatement(sql);
ps.setBytes(1, b.getBytes());
ps.setInt(2, c.getInt());
ps.setLong(3, clock.currentTimeMillis());
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
// Create an outstanding message row for each message in the batch
sql = "INSERT INTO outstandingMessages"
+ " (batchId, contactId, messageId)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, b.getBytes());
ps.setInt(2, c.getInt());
for(MessageId m : sent) {
ps.setBytes(3, m.getBytes());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if(batchAffected.length != sent.size())
throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
// Set the status of each message in the batch to SENT
sql = "UPDATE statuses SET status = ?"
// Set the status of each message to SENT if it's currently NEW
String sql = "UPDATE statuses SET status = ?"
+ " WHERE messageId = ? AND contactId = ? AND status = ?";
ps = txn.prepareStatement(sql);
ps.setShort(1, (short) Status.SENT.ordinal());
@@ -641,7 +567,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.setBytes(2, m.getBytes());
ps.addBatch();
}
batchAffected = ps.executeBatch();
int[] batchAffected = ps.executeBatch();
if(batchAffected.length != sent.size())
throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
@@ -649,7 +575,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
ps.close();
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
@@ -1006,30 +931,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<BatchId> getBatchesToAck(Connection txn, ContactId c,
int maxBatches) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT batchId FROM batchesToAck"
+ " WHERE contactId = ?"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, maxBatches);
rs = ps.executeQuery();
List<BatchId> ids = new ArrayList<BatchId>();
while(rs.next()) ids.add(new BatchId(rs.getBytes(1)));
rs.close();
ps.close();
return Collections.unmodifiableList(ids);
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public TransportConfig getConfig(Connection txn, TransportId t)
throws DbException {
PreparedStatement ps = null;
@@ -1216,27 +1117,10 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<BatchId> getLostBatches(Connection txn, ContactId c)
public Collection<MessageId> getLostMessages(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT batchId FROM outstandingBatches"
+ " WHERE contactId = ? AND passover >= ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, RETRANSMIT_THRESHOLD);
rs = ps.executeQuery();
List<BatchId> ids = new ArrayList<BatchId>();
while(rs.next()) ids.add(new BatchId(rs.getBytes(1)));
rs.close();
ps.close();
return Collections.unmodifiableList(ids);
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
// FIXME: Retransmission
return Collections.emptyList();
}
public byte[] getMessage(Connection txn, MessageId m) throws DbException {
@@ -1411,6 +1295,30 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<MessageId> getMessagesToAck(Connection txn, ContactId c,
int maxMessages) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT messageId FROM messagesToAck"
+ " WHERE contactId = ?"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, maxMessages);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
while(rs.next()) ids.add(new MessageId(rs.getBytes(1)));
rs.close();
ps.close();
return Collections.unmodifiableList(ids);
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public int getNumberOfSendableChildren(Connection txn, MessageId m)
throws DbException {
PreparedStatement ps = null;
@@ -1670,7 +1578,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
public Collection<MessageId> getSendableMessages(Connection txn,
ContactId c, int capacity) throws DbException {
ContactId c, int maxLength) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -1688,13 +1596,13 @@ abstract class JdbcDatabase implements Database<Connection> {
int total = 0;
while(rs.next()) {
int length = rs.getInt(1);
if(total + length > capacity) break;
if(total + length > maxLength) break;
ids.add(new MessageId(rs.getBytes(2)));
total += length;
}
rs.close();
ps.close();
if(total == capacity) return Collections.unmodifiableList(ids);
if(total == maxLength) return Collections.unmodifiableList(ids);
// Do we have any sendable group messages?
sql = "SELECT length, m.messageId FROM messages AS m"
+ " JOIN contactSubscriptions AS cs"
@@ -1719,7 +1627,7 @@ abstract class JdbcDatabase implements Database<Connection> {
rs = ps.executeQuery();
while(rs.next()) {
int length = rs.getInt(1);
if(total + length > capacity) break;
if(total + length > maxLength) break;
ids.add(new MessageId(rs.getBytes(2)));
total += length;
}
@@ -2067,99 +1975,52 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void removeAckedBatch(Connection txn, ContactId c, BatchId b)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT timestamp FROM outstandingBatches"
+ " WHERE contactId = ? AND batchId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, b.getBytes());
rs = ps.executeQuery();
if(!rs.next()) throw new DbStateException();
long timestamp = rs.getLong(1);
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
// Increment the passover count of all older outstanding batches
sql = "UPDATE outstandingBatches SET passover = passover + ?"
+ " WHERE contactId = ? AND timestamp < ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, 1);
ps.setInt(2, c.getInt());
ps.setLong(3, timestamp);
ps.executeUpdate();
ps.close();
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
removeBatch(txn, c, b, Status.SEEN);
public void removeAckedMessages(Connection txn, ContactId c,
Collection<MessageId> acked) throws DbException {
setStatus(txn, c, acked, Status.SEEN);
}
private void removeBatch(Connection txn, ContactId c, BatchId b,
Status newStatus) throws DbException {
PreparedStatement ps = null, ps1 = null;
ResultSet rs = null;
private void setStatus(Connection txn, ContactId c,
Collection<MessageId> ids, Status newStatus) throws DbException {
PreparedStatement ps = null;
try {
String sql = "SELECT messageId FROM outstandingMessages"
+ " WHERE contactId = ? AND batchId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, b.getBytes());
rs = ps.executeQuery();
sql = "UPDATE statuses SET status = ?"
// Set the status of each message if it's currently SENT
String sql = "UPDATE statuses SET status = ?"
+ " WHERE messageId = ? AND contactId = ? AND status = ?";
ps1 = txn.prepareStatement(sql);
ps1.setShort(1, (short) newStatus.ordinal());
ps1.setInt(3, c.getInt());
ps1.setShort(4, (short) Status.SENT.ordinal());
int messages = 0;
while(rs.next()) {
messages++;
ps1.setBytes(2, rs.getBytes(1));
ps1.addBatch();
}
rs.close();
ps.close();
int[] batchAffected = ps1.executeBatch();
if(batchAffected.length != messages) throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] > 1) throw new DbStateException();
}
ps1.close();
// Cascade on delete
sql = "DELETE FROM outstandingBatches WHERE batchId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, b.getBytes());
int affected = ps.executeUpdate();
if(affected > 1) throw new DbStateException();
ps.close();
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(ps1);
throw new DbException(e);
}
}
public void removeBatchesToAck(Connection txn, ContactId c,
Collection<BatchId> sent) throws DbException {
PreparedStatement ps = null;
try {
String sql = "DELETE FROM batchesToAck"
+ " WHERE contactId = ? and batchId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
for(BatchId b : sent) {
ps.setBytes(2, b.getBytes());
ps.setShort(1, (short) newStatus.ordinal());
ps.setInt(3, c.getInt());
ps.setShort(4, (short) Status.SENT.ordinal());
for(MessageId m : ids) {
ps.setBytes(2, m.getBytes());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if(batchAffected.length != sent.size())
if(batchAffected.length != ids.size()) throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] > 1) throw new DbStateException();
}
ps.close();
} catch(SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void removeMessagesToAck(Connection txn, ContactId c,
Collection<MessageId> acked) throws DbException {
PreparedStatement ps = null;
try {
String sql = "DELETE FROM messagesToAck"
+ " WHERE contactId = ? AND messageId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
for(MessageId m : acked) {
ps.setBytes(2, m.getBytes());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if(batchAffected.length != acked.size())
throw new DbStateException();
for(int i = 0; i < batchAffected.length; i++) {
if(batchAffected[i] != 1) throw new DbStateException();
@@ -2187,9 +2048,9 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void removeLostBatch(Connection txn, ContactId c, BatchId b)
throws DbException {
removeBatch(txn, c, b, Status.NEW);
public void removeLostMessages(Connection txn, ContactId c,
Collection<MessageId> lost) throws DbException {
setStatus(txn, c, lost, Status.NEW);
}
public void removeMessage(Connection txn, MessageId m) throws DbException {

View File

@@ -3,17 +3,17 @@ package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.MessageId;
class AckImpl implements Ack {
private final Collection<BatchId> acked;
private final Collection<MessageId> acked;
AckImpl(Collection<BatchId> acked) {
AckImpl(Collection<MessageId> acked) {
this.acked = acked;
}
public Collection<BatchId> getBatchIds() {
public Collection<MessageId> getMessageIds() {
return acked;
}
}

View File

@@ -10,7 +10,7 @@ import java.util.List;
import net.sf.briar.api.Bytes;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.Types;
import net.sf.briar.api.protocol.UniqueId;
@@ -38,14 +38,14 @@ class AckReader implements StructReader<Ack> {
r.resetMaxBytesLength();
r.removeConsumer(counting);
if(raw.isEmpty()) throw new FormatException();
// Convert the byte arrays to batch IDs
List<BatchId> batches = new ArrayList<BatchId>();
// Convert the byte arrays to message IDs
List<MessageId> acked = new ArrayList<MessageId>();
for(Bytes b : raw) {
if(b.getBytes().length != UniqueId.LENGTH)
throw new FormatException();
batches.add(new BatchId(b.getBytes()));
acked.add(new MessageId(b.getBytes()));
}
// Build and return the ack
return packetFactory.createAck(Collections.unmodifiableList(batches));
return packetFactory.createAck(Collections.unmodifiableList(acked));
}
}

View File

@@ -1,27 +0,0 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Message;
/** A simple in-memory implementation of a batch. */
class BatchImpl implements Batch {
private final BatchId id;
private final Collection<Message> messages;
BatchImpl(BatchId id, Collection<Message> messages) {
this.id = id;
this.messages = messages;
}
public BatchId getId() {
return id;
}
public Collection<Message> getMessages() {
return messages;
}
}

View File

@@ -1,41 +0,0 @@
package net.sf.briar.protocol;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import java.io.IOException;
import java.util.List;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.Types;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.serial.Consumer;
import net.sf.briar.api.serial.CountingConsumer;
import net.sf.briar.api.serial.StructReader;
import net.sf.briar.api.serial.Reader;
class BatchReader implements StructReader<UnverifiedBatch> {
private final StructReader<UnverifiedMessage> messageReader;
private final UnverifiedBatchFactory batchFactory;
BatchReader(StructReader<UnverifiedMessage> messageReader,
UnverifiedBatchFactory batchFactory) {
this.messageReader = messageReader;
this.batchFactory = batchFactory;
}
public UnverifiedBatch readStruct(Reader r) throws IOException {
// Initialise the consumer
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
// Read the data
r.addConsumer(counting);
r.readStructId(Types.BATCH);
r.addStructReader(Types.MESSAGE, messageReader);
List<UnverifiedMessage> messages = r.readList(UnverifiedMessage.class);
r.removeStructReader(Types.MESSAGE);
r.removeConsumer(counting);
if(messages.isEmpty()) throw new FormatException();
// Build and return the batch
return batchFactory.createUnverifiedBatch( messages);
}
}

View File

@@ -14,10 +14,11 @@ import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Types;
import net.sf.briar.api.protocol.UniqueId;
import net.sf.briar.api.protocol.UnverifiedMessage;
import net.sf.briar.api.serial.CopyingConsumer;
import net.sf.briar.api.serial.CountingConsumer;
import net.sf.briar.api.serial.StructReader;
import net.sf.briar.api.serial.Reader;
import net.sf.briar.api.serial.StructReader;
class MessageReader implements StructReader<UnverifiedMessage> {

View File

@@ -3,63 +3,44 @@ package net.sf.briar.protocol;
import java.security.GeneralSecurityException;
import java.security.PublicKey;
import java.security.Signature;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import net.sf.briar.api.crypto.CryptoComponent;
import net.sf.briar.api.crypto.KeyParser;
import net.sf.briar.api.crypto.MessageDigest;
import net.sf.briar.api.protocol.Author;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.UnverifiedMessage;
class UnverifiedBatchImpl implements UnverifiedBatch {
import com.google.inject.Inject;
class MessageVerifierImpl implements MessageVerifier {
private final CryptoComponent crypto;
private final Collection<UnverifiedMessage> messages;
private final MessageDigest batchDigest, messageDigest;
private final KeyParser keyParser;
// Initialise lazily - the batch may contain unsigned messages
private KeyParser keyParser = null;
private Signature signature = null;
UnverifiedBatchImpl(CryptoComponent crypto,
Collection<UnverifiedMessage> messages) {
@Inject
MessageVerifierImpl(CryptoComponent crypto) {
this.crypto = crypto;
this.messages = messages;
batchDigest = crypto.getMessageDigest();
messageDigest = crypto.getMessageDigest();
keyParser = crypto.getSignatureKeyParser();
}
public Batch verify() throws GeneralSecurityException {
List<Message> verified = new ArrayList<Message>();
for(UnverifiedMessage m : messages) verified.add(verify(m));
BatchId id = new BatchId(batchDigest.digest());
return new BatchImpl(id, Collections.unmodifiableList(verified));
}
private Message verify(UnverifiedMessage m)
throws GeneralSecurityException {
// The batch ID is the hash of the concatenated messages
byte[] raw = m.getRaw();
batchDigest.update(raw);
public Message verifyMessage(UnverifiedMessage m)
throws GeneralSecurityException {
MessageDigest messageDigest = crypto.getMessageDigest();
Signature signature = crypto.getSignature();
// Hash the message, including the signatures, to get the message ID
byte[] raw = m.getSerialised();
messageDigest.update(raw);
MessageId id = new MessageId(messageDigest.digest());
// Verify the author's signature, if there is one
Author author = m.getAuthor();
if(author != null) {
if(keyParser == null) keyParser = crypto.getSignatureKeyParser();
PublicKey k = keyParser.parsePublicKey(author.getPublicKey());
if(signature == null) signature = crypto.getSignature();
signature.initVerify(k);
signature.update(raw, 0, m.getLengthSignedByAuthor());
if(!signature.verify(m.getAuthorSignature()))
@@ -68,9 +49,7 @@ class UnverifiedBatchImpl implements UnverifiedBatch {
// Verify the group's signature, if there is one
Group group = m.getGroup();
if(group != null && group.getPublicKey() != null) {
if(keyParser == null) keyParser = crypto.getSignatureKeyParser();
PublicKey k = keyParser.parsePublicKey(group.getPublicKey());
if(signature == null) signature = crypto.getSignature();
signature.initVerify(k);
signature.update(raw, 0, m.getLengthSignedByGroup());
if(!signature.verify(m.getGroupSignature()))

View File

@@ -4,42 +4,23 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import net.sf.briar.api.crypto.CryptoComponent;
import net.sf.briar.api.crypto.MessageDigest;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.RawBatch;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.Transport;
import net.sf.briar.api.protocol.TransportUpdate;
import com.google.inject.Inject;
class PacketFactoryImpl implements PacketFactory {
private final CryptoComponent crypto;
@Inject
PacketFactoryImpl(CryptoComponent crypto) {
this.crypto = crypto;
}
public Ack createAck(Collection<BatchId> acked) {
public Ack createAck(Collection<MessageId> acked) {
return new AckImpl(acked);
}
public RawBatch createBatch(Collection<byte[]> messages) {
MessageDigest messageDigest = crypto.getMessageDigest();
for(byte[] raw : messages) messageDigest.update(raw);
return new RawBatchImpl(new BatchId(messageDigest.digest()), messages);
}
public Offer createOffer(Collection<MessageId> offered) {
return new OfferImpl(offered);
}

View File

@@ -9,6 +9,7 @@ import net.sf.briar.api.protocol.AuthorFactory;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupFactory;
import net.sf.briar.api.protocol.MessageFactory;
import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
@@ -16,7 +17,7 @@ import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.protocol.UnverifiedMessage;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.serial.StructReader;
import net.sf.briar.util.BoundedExecutor;
@@ -46,10 +47,10 @@ public class ProtocolModule extends AbstractModule {
bind(AuthorFactory.class).to(AuthorFactoryImpl.class);
bind(GroupFactory.class).to(GroupFactoryImpl.class);
bind(MessageFactory.class).to(MessageFactoryImpl.class);
bind(MessageVerifier.class).to(MessageVerifierImpl.class);
bind(PacketFactory.class).to(PacketFactoryImpl.class);
bind(ProtocolReaderFactory.class).to(ProtocolReaderFactoryImpl.class);
bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class);
bind(UnverifiedBatchFactory.class).to(UnverifiedBatchFactoryImpl.class);
// The executor is bounded, so tasks must be independent and short-lived
bind(Executor.class).annotatedWith(
VerificationExecutor.class).toInstance(
@@ -68,13 +69,6 @@ public class ProtocolModule extends AbstractModule {
return new AuthorReader(crypto, authorFactory);
}
@Provides
StructReader<UnverifiedBatch> getBatchReader(
StructReader<UnverifiedMessage> messageReader,
UnverifiedBatchFactory batchFactory) {
return new BatchReader(messageReader, batchFactory);
}
@Provides
StructReader<Group> getGroupReader(CryptoComponent crypto) {
return new GroupReader(crypto);

View File

@@ -9,9 +9,9 @@ import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.serial.StructReader;
import net.sf.briar.api.protocol.UnverifiedMessage;
import net.sf.briar.api.serial.ReaderFactory;
import net.sf.briar.api.serial.StructReader;
import com.google.inject.Inject;
import com.google.inject.Provider;
@@ -20,7 +20,7 @@ class ProtocolReaderFactoryImpl implements ProtocolReaderFactory {
private final ReaderFactory readerFactory;
private final Provider<StructReader<Ack>> ackProvider;
private final Provider<StructReader<UnverifiedBatch>> batchProvider;
private final Provider<StructReader<UnverifiedMessage>> messageProvider;
private final Provider<StructReader<Offer>> offerProvider;
private final Provider<StructReader<Request>> requestProvider;
private final Provider<StructReader<SubscriptionUpdate>> subscriptionProvider;
@@ -29,14 +29,14 @@ class ProtocolReaderFactoryImpl implements ProtocolReaderFactory {
@Inject
ProtocolReaderFactoryImpl(ReaderFactory readerFactory,
Provider<StructReader<Ack>> ackProvider,
Provider<StructReader<UnverifiedBatch>> batchProvider,
Provider<StructReader<UnverifiedMessage>> messageProvider,
Provider<StructReader<Offer>> offerProvider,
Provider<StructReader<Request>> requestProvider,
Provider<StructReader<SubscriptionUpdate>> subscriptionProvider,
Provider<StructReader<TransportUpdate>> transportProvider) {
this.readerFactory = readerFactory;
this.ackProvider = ackProvider;
this.batchProvider = batchProvider;
this.messageProvider = messageProvider;
this.offerProvider = offerProvider;
this.requestProvider = requestProvider;
this.subscriptionProvider = subscriptionProvider;
@@ -45,7 +45,8 @@ class ProtocolReaderFactoryImpl implements ProtocolReaderFactory {
public ProtocolReader createProtocolReader(InputStream in) {
return new ProtocolReaderImpl(in, readerFactory, ackProvider.get(),
batchProvider.get(), offerProvider.get(), requestProvider.get(),
subscriptionProvider.get(), transportProvider.get());
messageProvider.get(), offerProvider.get(),
requestProvider.get(), subscriptionProvider.get(),
transportProvider.get());
}
}

View File

@@ -10,10 +10,10 @@ import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.Types;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.serial.StructReader;
import net.sf.briar.api.protocol.UnverifiedMessage;
import net.sf.briar.api.serial.Reader;
import net.sf.briar.api.serial.ReaderFactory;
import net.sf.briar.api.serial.StructReader;
class ProtocolReaderImpl implements ProtocolReader {
@@ -21,14 +21,14 @@ class ProtocolReaderImpl implements ProtocolReader {
ProtocolReaderImpl(InputStream in, ReaderFactory readerFactory,
StructReader<Ack> ackReader,
StructReader<UnverifiedBatch> batchReader,
StructReader<UnverifiedMessage> messageReader,
StructReader<Offer> offerReader,
StructReader<Request> requestReader,
StructReader<SubscriptionUpdate> subscriptionReader,
StructReader<TransportUpdate> transportReader) {
reader = readerFactory.createReader(in);
reader.addStructReader(Types.ACK, ackReader);
reader.addStructReader(Types.BATCH, batchReader);
reader.addStructReader(Types.MESSAGE, messageReader);
reader.addStructReader(Types.OFFER, offerReader);
reader.addStructReader(Types.REQUEST, requestReader);
reader.addStructReader(Types.SUBSCRIPTION_UPDATE, subscriptionReader);
@@ -47,12 +47,12 @@ class ProtocolReaderImpl implements ProtocolReader {
return reader.readStruct(Types.ACK, Ack.class);
}
public boolean hasBatch() throws IOException {
return reader.hasStruct(Types.BATCH);
public boolean hasMessage() throws IOException {
return reader.hasStruct(Types.MESSAGE);
}
public UnverifiedBatch readBatch() throws IOException {
return reader.readStruct(Types.BATCH, UnverifiedBatch.class);
public UnverifiedMessage readMessage() throws IOException {
return reader.readStruct(Types.MESSAGE, UnverifiedMessage.class);
}
public boolean hasOffer() throws IOException {

View File

@@ -8,13 +8,11 @@ import java.util.BitSet;
import java.util.Map.Entry;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolWriter;
import net.sf.briar.api.protocol.RawBatch;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.Transport;
@@ -40,11 +38,11 @@ class ProtocolWriterImpl implements ProtocolWriter {
w = writerFactory.createWriter(out);
}
public int getMaxBatchesForAck(long capacity) {
public int getMaxMessagesForAck(long capacity) {
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
int overhead = serial.getSerialisedStructIdLength(Types.ACK)
+ serial.getSerialisedListStartLength()
+ serial.getSerialisedListEndLength();
+ serial.getSerialisedListStartLength()
+ serial.getSerialisedListEndLength();
int idLength = serial.getSerialisedUniqueIdLength();
return (packet - overhead) / idLength;
}
@@ -52,33 +50,22 @@ class ProtocolWriterImpl implements ProtocolWriter {
public int getMaxMessagesForOffer(long capacity) {
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
int overhead = serial.getSerialisedStructIdLength(Types.OFFER)
+ serial.getSerialisedListStartLength()
+ serial.getSerialisedListEndLength();
+ serial.getSerialisedListStartLength()
+ serial.getSerialisedListEndLength();
int idLength = serial.getSerialisedUniqueIdLength();
return (packet - overhead) / idLength;
}
public int getMessageCapacityForBatch(long capacity) {
int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH);
int overhead = serial.getSerialisedStructIdLength(Types.BATCH)
+ serial.getSerialisedListStartLength()
+ serial.getSerialisedListEndLength();
return packet - overhead;
}
public void writeAck(Ack a) throws IOException {
w.writeStructId(Types.ACK);
w.writeListStart();
for(BatchId b : a.getBatchIds()) w.writeBytes(b.getBytes());
for(MessageId m : a.getMessageIds()) w.writeBytes(m.getBytes());
w.writeListEnd();
if(flush) out.flush();
}
public void writeBatch(RawBatch b) throws IOException {
w.writeStructId(Types.BATCH);
w.writeListStart();
for(byte[] raw : b.getMessages()) out.write(raw);
w.writeListEnd();
public void writeMessage(byte[] raw) throws IOException {
out.write(raw);
if(flush) out.flush();
}
@@ -111,7 +98,7 @@ class ProtocolWriterImpl implements ProtocolWriter {
}
public void writeSubscriptionUpdate(SubscriptionUpdate s)
throws IOException {
throws IOException {
w.writeStructId(Types.SUBSCRIPTION_UPDATE);
// Holes
w.writeMapStart();

View File

@@ -1,25 +0,0 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.RawBatch;
class RawBatchImpl implements RawBatch {
private final BatchId id;
private final Collection<byte[]> messages;
RawBatchImpl(BatchId id, Collection<byte[]> messages) {
this.id = id;
this.messages = messages;
}
public BatchId getId() {
return id;
}
public Collection<byte[]> getMessages() {
return messages;
}
}

View File

@@ -1,11 +0,0 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.UnverifiedBatch;
interface UnverifiedBatchFactory {
UnverifiedBatch createUnverifiedBatch(
Collection<UnverifiedMessage> messages);
}

View File

@@ -1,23 +0,0 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.crypto.CryptoComponent;
import net.sf.briar.api.protocol.UnverifiedBatch;
import com.google.inject.Inject;
class UnverifiedBatchFactoryImpl implements UnverifiedBatchFactory {
private final CryptoComponent crypto;
@Inject
UnverifiedBatchFactoryImpl(CryptoComponent crypto) {
this.crypto = crypto;
}
public UnverifiedBatch createUnverifiedBatch(
Collection<UnverifiedMessage> messages) {
return new UnverifiedBatchImpl(crypto, messages);
}
}

View File

@@ -1,32 +0,0 @@
package net.sf.briar.protocol;
import net.sf.briar.api.protocol.Author;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.MessageId;
interface UnverifiedMessage {
MessageId getParent();
Group getGroup();
Author getAuthor();
String getSubject();
long getTimestamp();
byte[] getRaw();
byte[] getAuthorSignature();
byte[] getGroupSignature();
int getBodyStart();
int getBodyLength();
int getLengthSignedByAuthor();
int getLengthSignedByGroup();
}

View File

@@ -3,6 +3,7 @@ package net.sf.briar.protocol;
import net.sf.briar.api.protocol.Author;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.UnverifiedMessage;
class UnverifiedMessageImpl implements UnverifiedMessage {
@@ -52,7 +53,7 @@ class UnverifiedMessageImpl implements UnverifiedMessage {
return timestamp;
}
public byte[] getRaw() {
public byte[] getSerialised() {
return raw;
}

View File

@@ -24,28 +24,28 @@ import net.sf.briar.api.FormatException;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.event.BatchReceivedEvent;
import net.sf.briar.api.db.event.ContactRemovedEvent;
import net.sf.briar.api.db.event.DatabaseEvent;
import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent;
import net.sf.briar.api.db.event.MessagesAddedEvent;
import net.sf.briar.api.db.event.MessageAddedEvent;
import net.sf.briar.api.db.event.MessageReceivedEvent;
import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent;
import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolReader;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriter;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.RawBatch;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.protocol.UnverifiedMessage;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionReader;
@@ -76,6 +76,7 @@ abstract class DuplexConnection implements DatabaseListener {
protected final TransportId transportId;
private final Executor dbExecutor, verificationExecutor;
private final MessageVerifier messageVerifier;
private final AtomicBoolean canSendOffer, disposed;
private final BlockingQueue<Runnable> writerTasks;
@@ -85,7 +86,8 @@ abstract class DuplexConnection implements DatabaseListener {
DuplexConnection(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionRegistry connRegistry,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
@@ -93,6 +95,7 @@ abstract class DuplexConnection implements DatabaseListener {
DuplexTransportConnection transport) {
this.dbExecutor = dbExecutor;
this.verificationExecutor = verificationExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.connRegistry = connRegistry;
this.connReaderFactory = connReaderFactory;
@@ -115,12 +118,12 @@ abstract class DuplexConnection implements DatabaseListener {
throws IOException;
public void eventOccurred(DatabaseEvent e) {
if(e instanceof BatchReceivedEvent) {
if(e instanceof MessageReceivedEvent) {
dbExecutor.execute(new GenerateAcks());
} else if(e instanceof ContactRemovedEvent) {
ContactId c = ((ContactRemovedEvent) e).getContactId();
if(contactId.equals(c)) dispose(false, true);
} else if(e instanceof MessagesAddedEvent) {
} else if(e instanceof MessageAddedEvent) {
if(canSendOffer.getAndSet(false))
dbExecutor.execute(new GenerateOffer());
} else if(e instanceof SubscriptionsUpdatedEvent) {
@@ -142,9 +145,9 @@ abstract class DuplexConnection implements DatabaseListener {
if(reader.hasAck()) {
Ack a = reader.readAck();
dbExecutor.execute(new ReceiveAck(a));
} else if(reader.hasBatch()) {
UnverifiedBatch b = reader.readBatch();
verificationExecutor.execute(new VerifyBatch(b));
} else if(reader.hasMessage()) {
UnverifiedMessage m = reader.readMessage();
verificationExecutor.execute(new VerifyMessage(m));
} else if(reader.hasOffer()) {
Offer o = reader.readOffer();
dbExecutor.execute(new ReceiveOffer(o));
@@ -260,18 +263,18 @@ abstract class DuplexConnection implements DatabaseListener {
}
// This task runs on a verification thread
private class VerifyBatch implements Runnable {
private class VerifyMessage implements Runnable {
private final UnverifiedBatch batch;
private final UnverifiedMessage message;
private VerifyBatch(UnverifiedBatch batch) {
this.batch = batch;
private VerifyMessage(UnverifiedMessage message) {
this.message = message;
}
public void run() {
try {
Batch b = batch.verify();
dbExecutor.execute(new ReceiveBatch(b));
Message m = messageVerifier.verifyMessage(message);
dbExecutor.execute(new ReceiveMessage(m));
} catch(GeneralSecurityException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -279,17 +282,17 @@ abstract class DuplexConnection implements DatabaseListener {
}
// This task runs on a database thread
private class ReceiveBatch implements Runnable {
private class ReceiveMessage implements Runnable {
private final Batch batch;
private final Message message;
private ReceiveBatch(Batch batch) {
this.batch = batch;
private ReceiveMessage(Message message) {
this.message = message;
}
public void run() {
try {
db.receiveBatch(contactId, batch);
db.receiveMessage(contactId, message);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -394,9 +397,9 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
assert writer != null;
int maxBatches = writer.getMaxBatchesForAck(Long.MAX_VALUE);
int maxMessages = writer.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxBatches);
Ack a = db.generateAck(contactId, maxMessages);
if(a != null) writerTasks.add(new WriteAck(a));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -436,11 +439,11 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
assert writer != null;
int capacity = writer.getMessageCapacityForBatch(Long.MAX_VALUE);
try {
RawBatch b = db.generateBatch(contactId, capacity, requested);
if(b == null) new GenerateOffer().run();
else writerTasks.add(new WriteBatch(b, requested));
Collection<byte[]> batch = db.generateBatch(contactId,
Integer.MAX_VALUE, requested);
if(batch == null) new GenerateOffer().run();
else writerTasks.add(new WriteBatch(batch, requested));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -450,10 +453,11 @@ abstract class DuplexConnection implements DatabaseListener {
// This task runs on the writer thread
private class WriteBatch implements Runnable {
private final RawBatch batch;
private final Collection<byte[]> batch;
private final Collection<MessageId> requested;
private WriteBatch(RawBatch batch, Collection<MessageId> requested) {
private WriteBatch(Collection<byte[]> batch,
Collection<MessageId> requested) {
this.batch = batch;
this.requested = requested;
}
@@ -461,7 +465,7 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
assert writer != null;
try {
writer.writeBatch(batch);
for(byte[] raw : batch) writer.writeMessage(raw);
if(requested.isEmpty()) dbExecutor.execute(new GenerateOffer());
else dbExecutor.execute(new GenerateBatches(requested));
} catch(IOException e) {

View File

@@ -10,6 +10,7 @@ import net.sf.briar.api.crypto.KeyManager;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.TransportId;
@@ -28,6 +29,7 @@ class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
Logger.getLogger(DuplexConnectionFactoryImpl.class.getName());
private final Executor dbExecutor, verificationExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final KeyManager keyManager;
private final ConnectionRegistry connRegistry;
@@ -39,13 +41,14 @@ class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
@Inject
DuplexConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, KeyManager keyManager,
ConnectionRegistry connRegistry,
MessageVerifier messageVerifier, DatabaseComponent db,
KeyManager keyManager, ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory) {
this.dbExecutor = dbExecutor;
this.verificationExecutor = verificationExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.keyManager = keyManager;
this.connRegistry = connRegistry;
@@ -58,9 +61,9 @@ class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
public void createIncomingConnection(ConnectionContext ctx,
DuplexTransportConnection transport) {
final DuplexConnection conn = new IncomingDuplexConnection(dbExecutor,
verificationExecutor, db, connRegistry, connReaderFactory,
connWriterFactory, protoReaderFactory, protoWriterFactory, ctx,
transport);
verificationExecutor, messageVerifier, db, connRegistry,
connReaderFactory, connWriterFactory, protoReaderFactory,
protoWriterFactory, ctx, transport);
Runnable write = new Runnable() {
public void run() {
conn.write();
@@ -84,9 +87,9 @@ class DuplexConnectionFactoryImpl implements DuplexConnectionFactory {
return;
}
final DuplexConnection conn = new OutgoingDuplexConnection(dbExecutor,
verificationExecutor, db, connRegistry, connReaderFactory,
connWriterFactory, protoReaderFactory, protoWriterFactory, ctx,
transport);
verificationExecutor, messageVerifier, db, connRegistry,
connReaderFactory, connWriterFactory, protoReaderFactory,
protoWriterFactory, ctx, transport);
Runnable write = new Runnable() {
public void run() {
conn.write();

View File

@@ -6,6 +6,7 @@ import java.util.concurrent.Executor;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.VerificationExecutor;
@@ -20,15 +21,16 @@ class IncomingDuplexConnection extends DuplexConnection {
IncomingDuplexConnection(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionRegistry connRegistry,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory,
ConnectionContext ctx, DuplexTransportConnection transport) {
super(dbExecutor, verificationExecutor, db, connRegistry,
connReaderFactory, connWriterFactory, protoReaderFactory,
protoWriterFactory, ctx, transport);
super(dbExecutor, verificationExecutor, messageVerifier, db,
connRegistry, connReaderFactory, connWriterFactory,
protoReaderFactory, protoWriterFactory, ctx, transport);
}
@Override

View File

@@ -6,6 +6,7 @@ import java.util.concurrent.Executor;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.VerificationExecutor;
@@ -20,15 +21,16 @@ class OutgoingDuplexConnection extends DuplexConnection {
OutgoingDuplexConnection(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionRegistry connRegistry,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory, ConnectionContext ctx,
DuplexTransportConnection transport) {
super(dbExecutor, verificationExecutor, db, connRegistry,
connReaderFactory, connWriterFactory, protoReaderFactory,
protoWriterFactory, ctx, transport);
super(dbExecutor, verificationExecutor, messageVerifier, db,
connRegistry, connReaderFactory, connWriterFactory,
protoReaderFactory, protoWriterFactory, ctx, transport);
}
@Override

View File

@@ -15,13 +15,14 @@ import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.plugins.simplex.SimplexTransportReader;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.ProtocolReader;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.protocol.UnverifiedMessage;
import net.sf.briar.api.protocol.VerificationExecutor;
import net.sf.briar.api.transport.ConnectionContext;
import net.sf.briar.api.transport.ConnectionReader;
@@ -35,6 +36,7 @@ class IncomingSimplexConnection {
Logger.getLogger(IncomingSimplexConnection.class.getName());
private final Executor dbExecutor, verificationExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final ConnectionRegistry connRegistry;
private final ConnectionReaderFactory connFactory;
@@ -46,12 +48,14 @@ class IncomingSimplexConnection {
IncomingSimplexConnection(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, ConnectionRegistry connRegistry,
MessageVerifier messageVerifier, DatabaseComponent db,
ConnectionRegistry connRegistry,
ConnectionReaderFactory connFactory,
ProtocolReaderFactory protoFactory, ConnectionContext ctx,
SimplexTransportReader transport) {
this.dbExecutor = dbExecutor;
this.verificationExecutor = verificationExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.connRegistry = connRegistry;
this.connFactory = connFactory;
@@ -74,9 +78,9 @@ class IncomingSimplexConnection {
if(reader.hasAck()) {
Ack a = reader.readAck();
dbExecutor.execute(new ReceiveAck(a));
} else if(reader.hasBatch()) {
UnverifiedBatch b = reader.readBatch();
verificationExecutor.execute(new VerifyBatch(b));
} else if(reader.hasMessage()) {
UnverifiedMessage m = reader.readMessage();
verificationExecutor.execute(new VerifyMessage(m));
} else if(reader.hasSubscriptionUpdate()) {
SubscriptionUpdate s = reader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(s));
@@ -122,35 +126,35 @@ class IncomingSimplexConnection {
}
}
private class VerifyBatch implements Runnable {
private class VerifyMessage implements Runnable {
private final UnverifiedBatch batch;
private final UnverifiedMessage message;
private VerifyBatch(UnverifiedBatch batch) {
this.batch = batch;
private VerifyMessage(UnverifiedMessage message) {
this.message = message;
}
public void run() {
try {
Batch b = batch.verify();
dbExecutor.execute(new ReceiveBatch(b));
Message m = messageVerifier.verifyMessage(message);
dbExecutor.execute(new ReceiveMessage(m));
} catch(GeneralSecurityException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
}
}
private class ReceiveBatch implements Runnable {
private class ReceiveMessage implements Runnable {
private final Batch batch;
private final Message message;
private ReceiveBatch(Batch batch) {
this.batch = batch;
private ReceiveMessage(Message message) {
this.message = message;
}
public void run() {
try {
db.receiveBatch(contactId, batch);
db.receiveMessage(contactId, message);
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}

View File

@@ -6,6 +6,7 @@ import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
@@ -15,7 +16,6 @@ import net.sf.briar.api.plugins.simplex.SimplexTransportWriter;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.ProtocolWriter;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.RawBatch;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportUpdate;
@@ -77,23 +77,23 @@ class OutgoingSimplexConnection {
}
// Write acks until you can't write acks no more
capacity = conn.getRemainingCapacity();
int maxBatches = writer.getMaxBatchesForAck(capacity);
Ack a = db.generateAck(contactId, maxBatches);
int maxMessages = writer.getMaxMessagesForAck(capacity);
Ack a = db.generateAck(contactId, maxMessages);
while(a != null) {
writer.writeAck(a);
capacity = conn.getRemainingCapacity();
maxBatches = writer.getMaxBatchesForAck(capacity);
a = db.generateAck(contactId, maxBatches);
maxMessages = writer.getMaxMessagesForAck(capacity);
a = db.generateAck(contactId, maxMessages);
}
// Write batches until you can't write batches no more
// Write messages until you can't write messages no more
capacity = conn.getRemainingCapacity();
capacity = writer.getMessageCapacityForBatch(capacity);
RawBatch b = db.generateBatch(contactId, (int) capacity);
while(b != null) {
writer.writeBatch(b);
int maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH);
Collection<byte[]> batch = db.generateBatch(contactId, maxLength);
while(batch != null) {
for(byte[] raw : batch) writer.writeMessage(raw);
capacity = conn.getRemainingCapacity();
capacity = writer.getMessageCapacityForBatch(capacity);
b = db.generateBatch(contactId, (int) capacity);
maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH);
batch = db.generateBatch(contactId, maxLength);
}
writer.flush();
writer.close();

View File

@@ -11,6 +11,7 @@ import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.plugins.simplex.SimplexTransportReader;
import net.sf.briar.api.plugins.simplex.SimplexTransportWriter;
import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.TransportId;
@@ -29,6 +30,7 @@ class SimplexConnectionFactoryImpl implements SimplexConnectionFactory {
Logger.getLogger(SimplexConnectionFactoryImpl.class.getName());
private final Executor dbExecutor, verificationExecutor;
private final MessageVerifier messageVerifier;
private final DatabaseComponent db;
private final KeyManager keyManager;
private final ConnectionRegistry connRegistry;
@@ -40,14 +42,15 @@ class SimplexConnectionFactoryImpl implements SimplexConnectionFactory {
@Inject
SimplexConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor,
DatabaseComponent db, KeyManager keyManager,
ConnectionRegistry connRegistry,
MessageVerifier messageVerifier, DatabaseComponent db,
KeyManager keyManager, ConnectionRegistry connRegistry,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory) {
this.dbExecutor = dbExecutor;
this.verificationExecutor = verificationExecutor;
this.messageVerifier = messageVerifier;
this.db = db;
this.keyManager = keyManager;
this.connRegistry = connRegistry;
@@ -59,8 +62,8 @@ class SimplexConnectionFactoryImpl implements SimplexConnectionFactory {
public void createIncomingConnection(ConnectionContext ctx, SimplexTransportReader r) {
final IncomingSimplexConnection conn = new IncomingSimplexConnection(
dbExecutor, verificationExecutor, db, connRegistry,
connReaderFactory, protoReaderFactory, ctx, r);
dbExecutor, verificationExecutor, messageVerifier, db,
connRegistry, connReaderFactory, protoReaderFactory, ctx, r);
Runnable read = new Runnable() {
public void run() {
conn.read();