Bundles are gone - the batch-mode and stream-mode protocols now

consist of independently encrypted and authenticated packets (Ack,
Batch, Subscriptions and Transports so far).
This commit is contained in:
akwizgran
2011-07-22 22:19:24 +01:00
parent 5d000b62f8
commit de648daca5
42 changed files with 1309 additions and 1716 deletions

View File

@@ -1,7 +1,7 @@
package net.sf.briar.db;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating;
@@ -97,8 +97,8 @@ interface Database<T> {
* <p>
* Locking: contacts read, messages read, messageStatuses write.
*/
void addOutstandingBatch(T txn, ContactId c, BatchId b, Set<MessageId> sent)
throws DbException;
void addOutstandingBatch(T txn, ContactId c, BatchId b,
Collection<MessageId> sent) throws DbException;
/**
* Subscribes to the given group.
@@ -128,12 +128,20 @@ interface Database<T> {
*/
boolean containsSubscription(T txn, GroupId g) throws DbException;
/**
* Returns the IDs of any batches received from the given contact that need
* to be acknowledged.
* <p>
* Locking: contacts read, messageStatuses write.
*/
Collection<BatchId> getBatchesToAck(T txn, ContactId c) throws DbException;
/**
* Returns the IDs of all contacts.
* <p>
* Locking: contacts read.
*/
Set<ContactId> getContacts(T txn) throws DbException;
Collection<ContactId> getContacts(T txn) throws DbException;
/**
* Returns the amount of free storage space available to the database, in
@@ -157,7 +165,7 @@ interface Database<T> {
* <p>
* Locking: contacts read, messages read, messageStatuses read.
*/
Set<BatchId> getLostBatches(T txn, ContactId c) throws DbException;
Collection<BatchId> getLostBatches(T txn, ContactId c) throws DbException;
/**
* Returns the message identified by the given ID, in raw format.
@@ -171,7 +179,7 @@ interface Database<T> {
* <p>
* Locking: messages read.
*/
Iterable<MessageId> getMessagesByAuthor(T txn, AuthorId a)
Collection<MessageId> getMessagesByAuthor(T txn, AuthorId a)
throws DbException;
/**
@@ -188,7 +196,7 @@ interface Database<T> {
* <p>
* Locking: messages read.
*/
Iterable<MessageId> getOldMessages(T txn, long size) throws DbException;
Collection<MessageId> getOldMessages(T txn, long size) throws DbException;
/**
* Returns the parent of the given message.
@@ -219,7 +227,7 @@ interface Database<T> {
* <p>
* Locking: contacts read, messages read, messageStatuses read.
*/
Iterable<MessageId> getSendableMessages(T txn, ContactId c, long capacity)
Collection<MessageId> getSendableMessages(T txn, ContactId c, int capacity)
throws DbException;
/**
@@ -227,14 +235,14 @@ interface Database<T> {
* <p>
* Locking: subscriptions read.
*/
Set<GroupId> getSubscriptions(T txn) throws DbException;
Collection<GroupId> getSubscriptions(T txn) throws DbException;
/**
* Returns the groups to which the given contact subscribes.
* <p>
* Locking: contacts read, subscriptions read.
*/
Set<GroupId> getSubscriptions(T txn, ContactId c) throws DbException;
Collection<GroupId> getSubscriptions(T txn, ContactId c) throws DbException;
/**
* Returns the local transport details.
@@ -260,12 +268,11 @@ interface Database<T> {
void removeAckedBatch(T txn, ContactId c, BatchId b) throws DbException;
/**
* Removes and returns the IDs of any batches received from the given
* contact that need to be acknowledged.
* <p>
* Locking: contacts read, messageStatuses write.
* Marks the given batches received from the given contact as having been
* acknowledged.
*/
Set<BatchId> removeBatchesToAck(T txn, ContactId c) throws DbException;
void removeBatchesToAck(T txn, ContactId c, Collection<BatchId> sent)
throws DbException;
/**
* Removes a contact (and all associated state) from the database.
@@ -328,8 +335,8 @@ interface Database<T> {
* <p>
* Locking: contacts write, subscriptions write.
*/
void setSubscriptions(T txn, ContactId c, Set<GroupId> subs, long timestamp)
throws DbException;
void setSubscriptions(T txn, ContactId c, Collection<GroupId> subs,
long timestamp) throws DbException;
/**
* Sets the local transport details, replacing any existing transport

View File

@@ -9,12 +9,10 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -457,7 +455,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
public void addOutstandingBatch(Connection txn, ContactId c, BatchId b,
Set<MessageId> sent) throws DbException {
Collection<MessageId> sent) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -611,14 +609,37 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Set<ContactId> getContacts(Connection txn) throws DbException {
public Collection<BatchId> getBatchesToAck(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT batchId FROM batchesToAck"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
Collection<BatchId> ids = new ArrayList<BatchId>();
while(rs.next()) ids.add(new BatchId(rs.getBytes(1)));
rs.close();
ps.close();
return ids;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
public Collection<ContactId> getContacts(Connection txn) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT contactId FROM contacts";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
Set<ContactId> ids = new HashSet<ContactId>();
Collection<ContactId> ids = new ArrayList<ContactId>();
while(rs.next()) ids.add(new ContactId(rs.getInt(1)));
rs.close();
ps.close();
@@ -663,7 +684,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Set<BatchId> getLostBatches(Connection txn, ContactId c)
public Collection<BatchId> getLostBatches(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -674,7 +695,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.setInt(1, c.getInt());
ps.setInt(2, RETRANSMIT_THRESHOLD);
rs = ps.executeQuery();
Set<BatchId> ids = new HashSet<BatchId>();
Collection<BatchId> ids = new ArrayList<BatchId>();
while(rs.next()) ids.add(new BatchId(rs.getBytes(1)));
rs.close();
ps.close();
@@ -714,7 +735,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Iterable<MessageId> getMessagesByAuthor(Connection txn, AuthorId a)
public Collection<MessageId> getMessagesByAuthor(Connection txn, AuthorId a)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -723,7 +744,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ps = txn.prepareStatement(sql);
ps.setBytes(1, a.getBytes());
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
Collection<MessageId> ids = new ArrayList<MessageId>();
while(rs.next()) ids.add(new MessageId(rs.getBytes(1)));
rs.close();
ps.close();
@@ -799,7 +820,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Iterable<MessageId> getOldMessages(Connection txn, long capacity)
public Collection<MessageId> getOldMessages(Connection txn, long capacity)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -808,7 +829,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " ORDER BY timestamp";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
Collection<MessageId> ids = new ArrayList<MessageId>();
long total = 0L;
while(rs.next()) {
int size = rs.getInt(1);
@@ -901,8 +922,8 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Iterable<MessageId> getSendableMessages(Connection txn,
ContactId c, long capacity) throws DbException {
public Collection<MessageId> getSendableMessages(Connection txn,
ContactId c, int capacity) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
@@ -919,8 +940,8 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.setInt(2, c.getInt());
ps.setShort(3, (short) Status.NEW.ordinal());
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<MessageId>();
long total = 0;
Collection<MessageId> ids = new ArrayList<MessageId>();
int total = 0;
while(rs.next()) {
int size = rs.getInt(1);
if(total + size > capacity) break;
@@ -943,14 +964,15 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Set<GroupId> getSubscriptions(Connection txn) throws DbException {
public Collection<GroupId> getSubscriptions(Connection txn)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT groupId FROM localSubscriptions";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
Set<GroupId> ids = new HashSet<GroupId>();
Collection<GroupId> ids = new ArrayList<GroupId>();
while(rs.next()) ids.add(new GroupId(rs.getBytes(1)));
rs.close();
ps.close();
@@ -963,7 +985,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Set<GroupId> getSubscriptions(Connection txn, ContactId c)
public Collection<GroupId> getSubscriptions(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -973,7 +995,7 @@ abstract class JdbcDatabase implements Database<Connection> {
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
Set<GroupId> ids = new HashSet<GroupId>();
Collection<GroupId> ids = new ArrayList<GroupId>();
while(rs.next()) ids.add(new GroupId(rs.getBytes(1)));
rs.close();
ps.close();
@@ -1114,29 +1136,25 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Set<BatchId> removeBatchesToAck(Connection txn, ContactId c)
throws DbException {
public void removeBatchesToAck(Connection txn, ContactId c,
Collection<BatchId> sent) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT batchId FROM batchesToAck"
+ " WHERE contactId = ?";
String sql = "DELETE FROM batchesToAck"
+ " WHERE contactId = ? and batchId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
Set<BatchId> ids = new HashSet<BatchId>();
while(rs.next()) ids.add(new BatchId(rs.getBytes(1)));
rs.close();
for(BatchId b : sent) {
ps.setBytes(2, b.getBytes());
ps.addBatch();
}
int[] rowsAffectedArray = ps.executeBatch();
assert rowsAffectedArray.length == sent.size();
for(int i = 0; i < rowsAffectedArray.length; i++) {
assert rowsAffectedArray[i] == 1;
}
ps.close();
sql = "DELETE FROM batchesToAck WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
int rowsAffected = ps.executeUpdate();
assert rowsAffected == ids.size();
ps.close();
return ids;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
@@ -1310,8 +1328,8 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setSubscriptions(Connection txn, ContactId c, Set<GroupId> subs,
long timestamp) throws DbException {
public void setSubscriptions(Connection txn, ContactId c,
Collection<GroupId> subs, long timestamp) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {

View File

@@ -1,12 +1,10 @@
package net.sf.briar.db;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SignatureException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -17,17 +15,19 @@ import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NoSuchContactException;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AckWriter;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.BundleReader;
import net.sf.briar.api.protocol.BundleWriter;
import net.sf.briar.api.protocol.BatchWriter;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Header;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.serial.Raw;
import net.sf.briar.api.serial.RawByteArray;
import net.sf.briar.api.protocol.SubscriptionWriter;
import net.sf.briar.api.protocol.Subscriptions;
import net.sf.briar.api.protocol.TransportWriter;
import net.sf.briar.api.protocol.Transports;
import com.google.inject.Inject;
@@ -63,6 +63,34 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
super(db, cleaner);
}
protected void expireMessages(long size) throws DbException {
contactLock.readLock().lock();
try {
messageLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
for(MessageId m : db.getOldMessages(txn, size)) {
removeMessage(txn, m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void close() throws DbException {
cleaner.stopCleaning();
contactLock.writeLock().lock();
@@ -162,442 +190,9 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
protected void expireMessages(long size) throws DbException {
contactLock.readLock().lock();
try {
messageLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
for(MessageId m : db.getOldMessages(txn, size)) {
removeMessage(txn, m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void generateBundle(ContactId c, BundleWriter b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c);
Set<BatchId> acks = generateAcks(c);
Set<GroupId> subs = generateSubscriptions(c);
Map<String, String> transports = generateTransports(c);
// Add the header to the bundle
b.addHeader(acks, subs, transports);
// Add as many messages as possible to the bundle
while(generateBatch(c, b));
b.finish();
if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated");
System.gc();
}
private Set<BatchId> generateAcks(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
Set<BatchId> acks = db.removeBatchesToAck(txn, c);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + acks.size() + " acks");
db.commitTransaction(txn);
return acks;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
private Set<GroupId> generateSubscriptions(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = db.getSubscriptions(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
return subs;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
private Map<String, String> generateTransports(ContactId c)
throws DbException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
transportLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);
return transports;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
private boolean generateBatch(ContactId c, BundleWriter b)
throws DbException, IOException, GeneralSecurityException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
Set<MessageId> sent;
int bytesSent = 0;
BatchId batchId;
messageStatusLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
long capacity =
Math.min(b.getRemainingCapacity(), Batch.MAX_SIZE);
Iterator<MessageId> it =
db.getSendableMessages(txn, c, capacity).iterator();
if(!it.hasNext()) {
db.commitTransaction(txn);
return false; // No more messages to send
}
sent = new HashSet<MessageId>();
List<Raw> messages = new ArrayList<Raw>();
while(it.hasNext()) {
MessageId m = it.next();
byte[] message = db.getMessage(txn, m);
bytesSent += message.length;
messages.add(new RawByteArray(message));
sent.add(m);
}
batchId = b.addBatch(messages);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
} catch(SignatureException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.readLock().unlock();
}
// Record the contents of the batch
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
assert !sent.isEmpty();
db.addOutstandingBatch(txn, c, batchId, sent);
db.commitTransaction(txn);
// Don't create another batch if this one was half-empty
return bytesSent > Batch.MAX_SIZE / 2;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public Set<ContactId> getContacts() throws DbException {
contactLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Set<ContactId> contacts = db.getContacts(txn);
db.commitTransaction(txn);
return contacts;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
contactLock.readLock().unlock();
}
}
public Rating getRating(AuthorId a) throws DbException {
ratingLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Rating r = db.getRating(txn, a);
db.commitTransaction(txn);
return r;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
ratingLock.readLock().unlock();
}
}
public Set<GroupId> getSubscriptions() throws DbException {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = db.getSubscriptions(txn);
db.commitTransaction(txn);
return subs;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
}
public Map<String, String> getTransports() throws DbException {
transportLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn);
db.commitTransaction(txn);
return transports;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
}
}
public Map<String, String> getTransports(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
transportLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn, c);
db.commitTransaction(txn);
return transports;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveBundle(ContactId c, BundleReader b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + c);
Header h = b.getHeader();
receiveAcks(c, h);
receiveSubscriptions(c, h);
receiveTransports(c, h);
// Store the messages
int batches = 0;
Batch batch = null;
while((batch = b.getNextBatch()) != null) {
receiveBatch(c, batch);
batches++;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
b.finish();
findLostBatches(c);
System.gc();
}
private void receiveAcks(ContactId c, Header h) throws DbException {
// Mark all messages in acked batches as seen
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
Set<BatchId> acks = h.getAcks();
for(BatchId ack : acks) {
Txn txn = db.startTransaction();
try {
db.removeAckedBatch(txn, c, ack);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + acks.size() + " acks");
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
private void receiveSubscriptions(ContactId c, Header h)
throws DbException {
// Update the contact's subscriptions
contactLock.writeLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
subscriptionLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = h.getSubscriptions();
db.setSubscriptions(txn, c, subs, h.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + subs.size() + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
}
}
private void receiveTransports(ContactId c, Header h) throws DbException {
// Update the contact's transport details
contactLock.writeLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
transportLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = h.getTransports();
db.setTransports(txn, c, transports, h.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + transports.size()
+ " transports");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
}
}
private void receiveBatch(ContactId c, Batch b) throws DbException {
waitForPermissionToWrite();
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
int received = 0, stored = 0;
for(Message m : b.getMessages()) {
received++;
GroupId g = m.getGroup();
if(db.containsSubscription(txn, g)) {
if(storeMessage(txn, m, c)) stored++;
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + received
+ " messages, stored " + stored);
db.addBatchToAck(txn, c, b.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
private void findLostBatches(ContactId c) throws DbException {
public void findLostBatches(ContactId c) throws DbException {
// Find any lost batches that need to be retransmitted
Set<BatchId> lost;
Collection<BatchId> lost;
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -652,6 +247,379 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public void generateAck(ContactId c, AckWriter a) throws DbException,
IOException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
Collection<BatchId> acks = db.getBatchesToAck(txn, c);
Collection<BatchId> sent = new ArrayList<BatchId>();
for(BatchId b : acks) if(a.addBatchId(b)) sent.add(b);
a.finish();
db.removeBatchesToAck(txn, c, sent);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + acks.size() + " acks");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void generateBatch(ContactId c, BatchWriter b) throws DbException,
IOException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
Set<MessageId> sent;
int bytesSent = 0;
messageStatusLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
int capacity = b.getCapacity();
Iterator<MessageId> it =
db.getSendableMessages(txn, c, capacity).iterator();
sent = new HashSet<MessageId>();
while(it.hasNext()) {
MessageId m = it.next();
byte[] message = db.getMessage(txn, m);
if(!b.addMessage(message)) break;
bytesSent += message.length;
sent.add(m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.readLock().unlock();
}
BatchId id = b.finish();
// Record the contents of the batch, unless it was empty
if(sent.isEmpty()) return;
messageStatusLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void generateSubscriptions(ContactId c, SubscriptionWriter s)
throws DbException, IOException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
// FIXME: This should deal in Groups, not GroupIds
Collection<GroupId> subs = db.getSubscriptions(txn);
s.setSubscriptions(subs);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void generateTransports(ContactId c, TransportWriter t)
throws DbException, IOException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
transportLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn);
t.setTransports(transports);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public Collection<ContactId> getContacts() throws DbException {
contactLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Collection<ContactId> contacts = db.getContacts(txn);
db.commitTransaction(txn);
return contacts;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
contactLock.readLock().unlock();
}
}
public Rating getRating(AuthorId a) throws DbException {
ratingLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Rating r = db.getRating(txn, a);
db.commitTransaction(txn);
return r;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
ratingLock.readLock().unlock();
}
}
public Collection<GroupId> getSubscriptions() throws DbException {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Collection<GroupId> subs = db.getSubscriptions(txn);
db.commitTransaction(txn);
return subs;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
}
public Map<String, String> getTransports() throws DbException {
transportLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn);
db.commitTransaction(txn);
return transports;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
}
}
public Map<String, String> getTransports(ContactId c) throws DbException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
transportLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn, c);
db.commitTransaction(txn);
return transports;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveAck(ContactId c, Ack a) throws DbException {
// Mark all messages in acked batches as seen
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.readLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
Collection<BatchId> acks = a.getBatches();
for(BatchId ack : acks) {
Txn txn = db.startTransaction();
try {
db.removeAckedBatch(txn, c, ack);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + acks.size() + " acks");
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveBatch(ContactId c, Batch b) throws DbException {
waitForPermissionToWrite();
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
messageLock.writeLock().lock();
try {
messageStatusLock.writeLock().lock();
try {
subscriptionLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
int received = 0, stored = 0;
for(Message m : b.getMessages()) {
received++;
GroupId g = m.getGroup();
if(db.containsSubscription(txn, g)) {
if(storeMessage(txn, m, c)) stored++;
}
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + received
+ " messages, stored " + stored);
db.addBatchToAck(txn, c, b.getId());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
}
} finally {
messageStatusLock.writeLock().unlock();
}
} finally {
messageLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveSubscriptions(ContactId c, Subscriptions s)
throws DbException {
// Update the contact's subscriptions
contactLock.writeLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
subscriptionLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
Collection<GroupId> subs = s.getSubscriptions();
db.setSubscriptions(txn, c, subs, s.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + subs.size() + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
}
}
public void receiveTransports(ContactId c, Transports t)
throws DbException {
// Update the contact's transport details
contactLock.writeLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
transportLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = t.getTransports();
db.setTransports(txn, c, transports, t.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + transports.size()
+ " transports");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.writeLock().unlock();
}
} finally {
contactLock.writeLock().unlock();
}
}
public void removeContact(ContactId c) throws DbException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing contact " + c);
contactLock.writeLock().lock();

View File

@@ -1,12 +1,10 @@
package net.sf.briar.db;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SignatureException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
@@ -16,17 +14,19 @@ import net.sf.briar.api.ContactId;
import net.sf.briar.api.Rating;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.NoSuchContactException;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AckWriter;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.BundleReader;
import net.sf.briar.api.protocol.BundleWriter;
import net.sf.briar.api.protocol.BatchWriter;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Header;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.serial.Raw;
import net.sf.briar.api.serial.RawByteArray;
import net.sf.briar.api.protocol.SubscriptionWriter;
import net.sf.briar.api.protocol.Subscriptions;
import net.sf.briar.api.protocol.TransportWriter;
import net.sf.briar.api.protocol.Transports;
import com.google.inject.Inject;
@@ -56,6 +56,25 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
super(db, cleaner);
}
protected void expireMessages(long size) throws DbException {
synchronized(contactLock) {
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
for(MessageId m : db.getOldMessages(txn, size)) {
removeMessage(txn, m);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
public void close() throws DbException {
cleaner.stopCleaning();
synchronized(contactLock) {
@@ -124,15 +143,16 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
protected void expireMessages(long size) throws DbException {
public void findLostBatches(ContactId c) throws DbException {
// Find any lost batches that need to be retransmitted
Collection<BatchId> lost;
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
for(MessageId m : db.getOldMessages(txn, size)) {
removeMessage(txn, m);
}
lost = db.getLostBatches(txn, c);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -141,134 +161,142 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
for(BatchId batch : lost) {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Removing lost batch");
db.removeLostBatch(txn, c, batch);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
}
public void generateBundle(ContactId c, BundleWriter b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c);
Set<BatchId> acks = generateAcks(c);
Set<GroupId> subs = generateSubscriptions(c);
Map<String, String> transports = generateTransports(c);
// Add the header to the bundle
b.addHeader(acks, subs, transports);
// Add as many messages as possible to the bundle
while(generateBatch(c, b));
b.finish();
if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated");
System.gc();
}
private Set<BatchId> generateAcks(ContactId c) throws DbException {
public void generateAck(ContactId c, AckWriter a) throws DbException,
IOException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
Set<BatchId> acks = db.removeBatchesToAck(txn, c);
Collection<BatchId> acks = db.getBatchesToAck(txn, c);
Collection<BatchId> sent = new ArrayList<BatchId>();
for(BatchId b : acks) if(a.addBatchId(b)) sent.add(b);
a.finish();
db.removeBatchesToAck(txn, c, sent);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + acks.size() + " acks");
db.commitTransaction(txn);
return acks;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
private Set<GroupId> generateSubscriptions(ContactId c) throws DbException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = db.getSubscriptions(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
return subs;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
private Map<String, String> generateTransports(ContactId c)
throws DbException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);
return transports;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
private boolean generateBatch(ContactId c, BundleWriter b)
throws DbException, IOException, GeneralSecurityException {
public void generateBatch(ContactId c, BatchWriter b) throws DbException,
IOException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
long capacity =
Math.min(b.getRemainingCapacity(), Batch.MAX_SIZE);
int capacity = b.getCapacity();
Iterator<MessageId> it =
db.getSendableMessages(txn, c, capacity).iterator();
if(!it.hasNext()) {
db.commitTransaction(txn);
return false; // No more messages to send
}
Set<MessageId> sent = new HashSet<MessageId>();
List<Raw> messages = new ArrayList<Raw>();
int bytesSent = 0;
while(it.hasNext()) {
MessageId m = it.next();
byte[] message = db.getMessage(txn, m);
if(!b.addMessage(message)) break;
bytesSent += message.length;
messages.add(new RawByteArray(message));
sent.add(m);
}
BatchId batchId = b.addBatch(messages);
// Record the contents of the batch
assert !sent.isEmpty();
db.addOutstandingBatch(txn, c, batchId, sent);
BatchId id = b.finish();
// Record the contents of the batch, unless it was empty
if(!sent.isEmpty())
db.addOutstandingBatch(txn, c, id, sent);
db.commitTransaction(txn);
// Don't create another batch if this one was half-empty
return bytesSent > Batch.MAX_SIZE / 2;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
} catch(SignatureException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
}
public Set<ContactId> getContacts() throws DbException {
public void generateSubscriptions(ContactId c, SubscriptionWriter s)
throws DbException, IOException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
// FIXME: This should deal in Groups, not GroupIds
Collection<GroupId> subs = db.getSubscriptions(txn);
s.setSubscriptions(subs);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + subs.size() + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
public void generateTransports(ContactId c, TransportWriter t)
throws DbException, IOException {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = db.getTransports(txn);
t.setTransports(transports);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + transports.size() + " transports");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
} catch(IOException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
public Collection<ContactId> getContacts() throws DbException {
synchronized(contactLock) {
Txn txn = db.startTransaction();
try {
Set<ContactId> contacts = db.getContacts(txn);
Collection<ContactId> contacts = db.getContacts(txn);
db.commitTransaction(txn);
return contacts;
} catch(DbException e) {
@@ -292,11 +320,11 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public Set<GroupId> getSubscriptions() throws DbException {
public Collection<GroupId> getSubscriptions() throws DbException {
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = db.getSubscriptions(txn);
Collection<GroupId> subs = db.getSubscriptions(txn);
db.commitTransaction(txn);
return subs;
} catch(DbException e) {
@@ -337,34 +365,13 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
public void receiveBundle(ContactId c, BundleReader b) throws DbException,
IOException, GeneralSecurityException {
if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + c);
Header h = b.getHeader();
receiveAcks(c, h);
receiveSubscriptions(c, h);
receiveTransports(c, h);
// Store the messages
int batches = 0;
Batch batch = null;
while((batch = b.getNextBatch()) != null) {
receiveBatch(c, batch);
batches++;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + batches + " batches");
b.finish();
findLostBatches(c);
System.gc();
}
private void receiveAcks(ContactId c, Header h) throws DbException {
public void receiveAck(ContactId c, Ack a) throws DbException {
// Mark all messages in acked batches as seen
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Set<BatchId> acks = h.getAcks();
Collection<BatchId> acks = a.getBatches();
for(BatchId ack : acks) {
Txn txn = db.startTransaction();
try {
@@ -382,49 +389,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
private void receiveSubscriptions(ContactId c, Header h)
throws DbException {
// Update the contact's subscriptions
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
Set<GroupId> subs = h.getSubscriptions();
db.setSubscriptions(txn, c, subs, h.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + subs.size() + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
private void receiveTransports(ContactId c, Header h) throws DbException {
// Update the contact's transport details
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = h.getTransports();
db.setTransports(txn, c, transports, h.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + transports.size()
+ " transports");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
}
private void receiveBatch(ContactId c, Batch b) throws DbException {
public void receiveBatch(ContactId c, Batch b) throws DbException {
waitForPermissionToWrite();
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
@@ -456,41 +421,44 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
private void findLostBatches(ContactId c)
public void receiveSubscriptions(ContactId c, Subscriptions s)
throws DbException {
// Find any lost batches that need to be retransmitted
Set<BatchId> lost;
// Update the contact's subscriptions
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
lost = db.getLostBatches(txn, c);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
synchronized(subscriptionLock) {
Txn txn = db.startTransaction();
try {
Collection<GroupId> subs = s.getSubscriptions();
db.setSubscriptions(txn, c, subs, s.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + subs.size() + " subscriptions");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
for(BatchId batch : lost) {
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(messageLock) {
synchronized(messageStatusLock) {
Txn txn = db.startTransaction();
try {
if(LOG.isLoggable(Level.FINE))
LOG.fine("Removing lost batch");
db.removeLostBatch(txn, c, batch);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
public void receiveTransports(ContactId c, Transports t)
throws DbException {
// Update the contact's transport details
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
Map<String, String> transports = t.getTransports();
db.setTransports(txn, c, transports, t.getTimestamp());
if(LOG.isLoggable(Level.FINE))
LOG.fine("Received " + transports.size()
+ " transports");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}

View File

@@ -0,0 +1,11 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
interface AckFactory {
Ack createAck(Collection<BatchId> batches);
}

View File

@@ -0,0 +1,13 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
class AckFactoryImpl implements AckFactory {
public Ack createAck(Collection<BatchId> batches) {
return new AckImpl(batches);
}
}

View File

@@ -0,0 +1,19 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
public class AckImpl implements Ack {
private final Collection<BatchId> batches;
AckImpl(Collection<BatchId> batches) {
this.batches = batches;
}
public Collection<BatchId> getBatches() {
return batches;
}
}

View File

@@ -0,0 +1,33 @@
package net.sf.briar.protocol;
import java.io.IOException;
import java.util.Collection;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Tags;
import net.sf.briar.api.serial.ObjectReader;
import net.sf.briar.api.serial.Reader;
public class AckReader implements ObjectReader<Ack> {
private final AckFactory ackFactory;
AckReader(AckFactory ackFactory) {
this.ackFactory = ackFactory;
}
public Ack readObject(Reader r) throws IOException {
// Initialise the consumer
CountingConsumer counting = new CountingConsumer(Ack.MAX_SIZE);
// Read and digest the data
r.addConsumer(counting);
r.readUserDefinedTag(Tags.ACK);
r.addObjectReader(Tags.BATCH_ID, new BatchIdReader());
Collection<BatchId> batches = r.readList(BatchId.class);
r.removeObjectReader(Tags.BATCH_ID);
r.removeConsumer(counting);
// Build and return the ack
return ackFactory.createAck(batches);
}
}

View File

@@ -0,0 +1,49 @@
package net.sf.briar.protocol;
import java.io.IOException;
import java.io.OutputStream;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AckWriter;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.Tags;
import net.sf.briar.api.serial.Writer;
import net.sf.briar.api.serial.WriterFactory;
class AckWriterImpl implements AckWriter {
private final OutputStream out;
private final Writer w;
private boolean started = false, finished = false;
AckWriterImpl(OutputStream out, WriterFactory writerFactory) {
this.out = out;
this.w = writerFactory.createWriter(out);
}
public boolean addBatchId(BatchId b) throws IOException {
if(finished) throw new IllegalStateException();
if(!started) {
w.writeUserDefinedTag(Tags.ACK);
w.writeListStart();
started = true;
}
int capacity = Ack.MAX_SIZE - (int) w.getBytesWritten() - 1;
if(capacity < BatchId.SERIALISED_LENGTH) return false;
b.writeTo(w);
return true;
}
public void finish() throws IOException {
if(finished) throw new IllegalStateException();
if(!started) {
w.writeUserDefinedTag(Tags.ACK);
w.writeListStart();
started = true;
}
w.writeListEnd();
out.flush();
finished = true;
}
}

View File

@@ -1,6 +1,6 @@
package net.sf.briar.protocol;
import java.util.List;
import java.util.Collection;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
@@ -8,5 +8,5 @@ import net.sf.briar.api.protocol.Message;
interface BatchFactory {
Batch createBatch(BatchId id, List<Message> messages);
Batch createBatch(BatchId id, Collection<Message> messages);
}

View File

@@ -1,6 +1,6 @@
package net.sf.briar.protocol;
import java.util.List;
import java.util.Collection;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
@@ -8,7 +8,7 @@ import net.sf.briar.api.protocol.Message;
class BatchFactoryImpl implements BatchFactory {
public Batch createBatch(BatchId id, List<Message> messages) {
public Batch createBatch(BatchId id, Collection<Message> messages) {
return new BatchImpl(id, messages);
}
}

View File

@@ -1,6 +1,6 @@
package net.sf.briar.protocol;
import java.util.List;
import java.util.Collection;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
@@ -10,9 +10,9 @@ import net.sf.briar.api.protocol.Message;
class BatchImpl implements Batch {
private final BatchId id;
private final List<Message> messages;
private final Collection<Message> messages;
BatchImpl(BatchId id, List<Message> messages) {
BatchImpl(BatchId id, Collection<Message> messages) {
this.id = id;
this.messages = messages;
}
@@ -21,7 +21,7 @@ class BatchImpl implements Batch {
return id;
}
public Iterable<Message> getMessages() {
public Collection<Message> getMessages() {
return messages;
}
}

View File

@@ -0,0 +1,62 @@
package net.sf.briar.protocol;
import java.io.IOException;
import java.io.OutputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.BatchWriter;
import net.sf.briar.api.protocol.Tags;
import net.sf.briar.api.serial.Writer;
import net.sf.briar.api.serial.WriterFactory;
class BatchWriterImpl implements BatchWriter {
private final DigestOutputStream out;
private final Writer w;
private final MessageDigest messageDigest;
private boolean started = false, finished = false;
BatchWriterImpl(OutputStream out, WriterFactory writerFactory,
MessageDigest messageDigest) {
this.out = new DigestOutputStream(out, messageDigest);
w = writerFactory.createWriter(this.out);
this.messageDigest = messageDigest;
}
public int getCapacity() {
return Batch.MAX_SIZE - 3;
}
public boolean addMessage(byte[] message) throws IOException {
if(finished) throw new IllegalStateException();
if(!started) {
messageDigest.reset();
w.writeUserDefinedTag(Tags.BATCH);
w.writeListStart();
started = true;
}
int capacity = Batch.MAX_SIZE - (int) w.getBytesWritten() - 1;
if(capacity < message.length) return false;
// Bypass the writer and write each raw message directly
out.write(message);
return true;
}
public BatchId finish() throws IOException {
if(finished) throw new IllegalStateException();
if(!started) {
messageDigest.reset();
w.writeUserDefinedTag(Tags.BATCH);
w.writeListStart();
started = true;
}
w.writeListEnd();
out.flush();
finished = true;
return new BatchId(messageDigest.digest());
}
}

View File

@@ -1,58 +0,0 @@
package net.sf.briar.protocol;
import java.io.IOException;
import java.security.GeneralSecurityException;
import net.sf.briar.api.protocol.Batch;
import net.sf.briar.api.protocol.BundleReader;
import net.sf.briar.api.protocol.Header;
import net.sf.briar.api.protocol.Tags;
import net.sf.briar.api.serial.FormatException;
import net.sf.briar.api.serial.ObjectReader;
import net.sf.briar.api.serial.Reader;
class BundleReaderImpl implements BundleReader {
private static enum State { START, BATCHES, END };
private final Reader reader;
private final ObjectReader<Header> headerReader;
private final ObjectReader<Batch> batchReader;
private State state = State.START;
BundleReaderImpl(Reader reader, ObjectReader<Header> headerReader,
ObjectReader<Batch> batchReader) {
this.reader = reader;
this.headerReader = headerReader;
this.batchReader = batchReader;
}
public Header getHeader() throws IOException, GeneralSecurityException {
if(state != State.START) throw new IllegalStateException();
reader.addObjectReader(Tags.HEADER, headerReader);
Header h = reader.readUserDefined(Tags.HEADER, Header.class);
reader.removeObjectReader(Tags.HEADER);
// Expect a list of batches
reader.readListStart();
reader.addObjectReader(Tags.BATCH, batchReader);
state = State.BATCHES;
return h;
}
public Batch getNextBatch() throws IOException, GeneralSecurityException {
if(state != State.BATCHES) throw new IllegalStateException();
if(reader.hasListEnd()) {
reader.removeObjectReader(Tags.BATCH);
reader.readListEnd();
// That should be all
if(!reader.eof()) throw new FormatException();
state = State.END;
return null;
}
return reader.readUserDefined(Tags.BATCH, Batch.class);
}
public void finish() throws IOException {
reader.close();
}
}

View File

@@ -1,91 +0,0 @@
package net.sf.briar.protocol;
import java.io.IOException;
import java.io.OutputStream;
import java.security.DigestOutputStream;
import java.security.GeneralSecurityException;
import java.security.MessageDigest;
import java.util.Collection;
import java.util.Map;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.BundleWriter;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Tags;
import net.sf.briar.api.serial.Raw;
import net.sf.briar.api.serial.Writer;
import net.sf.briar.api.serial.WriterFactory;
class BundleWriterImpl implements BundleWriter {
private static enum State { START, FIRST_BATCH, MORE_BATCHES, END };
private final DigestOutputStream out;
private final Writer writer;
private final MessageDigest messageDigest;
private final long capacity;
private State state = State.START;
BundleWriterImpl(OutputStream out, WriterFactory writerFactory,
MessageDigest messageDigest, long capacity) {
this.out = new DigestOutputStream(out, messageDigest);
this.out.on(false); // Turn off the digest until we need it
writer = writerFactory.createWriter(this.out);
this.messageDigest = messageDigest;
this.capacity = capacity;
}
public long getRemainingCapacity() {
return capacity - writer.getBytesWritten();
}
public void addHeader(Collection<BatchId> acks, Collection<GroupId> subs,
Map<String, String> transports) throws IOException,
GeneralSecurityException {
if(state != State.START) throw new IllegalStateException();
// Write the initial tag
writer.writeUserDefinedTag(Tags.HEADER);
// Write the data
writer.writeList(acks);
writer.writeList(subs);
writer.writeMap(transports);
writer.writeInt64(System.currentTimeMillis());
// Expect a (possibly empty) list of batches
state = State.FIRST_BATCH;
}
public BatchId addBatch(Collection<Raw> messages) throws IOException,
GeneralSecurityException {
if(state == State.FIRST_BATCH) {
writer.writeListStart();
state = State.MORE_BATCHES;
}
if(state != State.MORE_BATCHES) throw new IllegalStateException();
// Write the initial tag
writer.writeUserDefinedTag(Tags.BATCH);
// Start digesting
messageDigest.reset();
out.on(true);
// Write the data
writer.writeListStart();
// Bypass the writer and write each raw message directly
for(Raw message : messages) out.write(message.getBytes());
writer.writeListEnd();
// Stop digesting
out.on(false);
// Calculate and return the ID
return new BatchId(messageDigest.digest());
}
public void finish() throws IOException {
if(state == State.FIRST_BATCH) {
writer.writeListStart();
state = State.MORE_BATCHES;
}
if(state != State.MORE_BATCHES) throw new IllegalStateException();
writer.writeListEnd();
out.flush();
out.close();
state = State.END;
}
}

View File

@@ -1,14 +0,0 @@
package net.sf.briar.protocol;
import java.util.Collection;
import java.util.Map;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Header;
interface HeaderFactory {
Header createHeader(Collection<BatchId> acks, Collection<GroupId> subs,
Map<String, String> transports, long timestamp);
}

View File

@@ -1,21 +0,0 @@
package net.sf.briar.protocol;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Header;
class HeaderFactoryImpl implements HeaderFactory {
public Header createHeader(Collection<BatchId> acks,
Collection<GroupId> subs, Map<String, String> transports,
long timestamp) {
Set<BatchId> ackSet = new HashSet<BatchId>(acks);
Set<GroupId> subSet = new HashSet<GroupId>(subs);
return new HeaderImpl(ackSet, subSet, transports, timestamp);
}
}

View File

@@ -1,41 +0,0 @@
package net.sf.briar.protocol;
import java.util.Map;
import java.util.Set;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Header;
/** A simple in-memory implementation of a header. */
class HeaderImpl implements Header {
private final Set<BatchId> acks;
private final Set<GroupId> subs;
private final Map<String, String> transports;
private final long timestamp;
HeaderImpl(Set<BatchId> acks, Set<GroupId> subs,
Map<String, String> transports, long timestamp) {
this.acks = acks;
this.subs = subs;
this.transports = transports;
this.timestamp = timestamp;
}
public Set<BatchId> getAcks() {
return acks;
}
public Set<GroupId> getSubscriptions() {
return subs;
}
public Map<String, String> getTransports() {
return transports;
}
public long getTimestamp() {
return timestamp;
}
}

View File

@@ -1,47 +0,0 @@
package net.sf.briar.protocol;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import net.sf.briar.api.protocol.BatchId;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Header;
import net.sf.briar.api.protocol.Tags;
import net.sf.briar.api.serial.FormatException;
import net.sf.briar.api.serial.ObjectReader;
import net.sf.briar.api.serial.Reader;
class HeaderReader implements ObjectReader<Header> {
private final HeaderFactory headerFactory;
HeaderReader(HeaderFactory headerFactory) {
this.headerFactory = headerFactory;
}
public Header readObject(Reader r) throws IOException {
// Initialise and add the consumer
CountingConsumer counting = new CountingConsumer(Header.MAX_SIZE);
r.addConsumer(counting);
r.readUserDefinedTag(Tags.HEADER);
// Acks
r.addObjectReader(Tags.BATCH_ID, new BatchIdReader());
Collection<BatchId> acks = r.readList(BatchId.class);
r.removeObjectReader(Tags.BATCH_ID);
// Subs
r.addObjectReader(Tags.GROUP_ID, new GroupIdReader());
Collection<GroupId> subs = r.readList(GroupId.class);
r.removeObjectReader(Tags.GROUP_ID);
// Transports
Map<String, String> transports =
r.readMap(String.class, String.class);
// Timestamp
long timestamp = r.readInt64();
if(timestamp < 0L) throw new FormatException();
// Remove the consumer
r.removeConsumer(counting);
// Build and return the header
return headerFactory.createHeader(acks, subs, transports, timestamp);
}
}

View File

@@ -1,8 +1,5 @@
package net.sf.briar.protocol;
import net.sf.briar.api.protocol.BundleReader;
import net.sf.briar.api.protocol.BundleWriter;
import com.google.inject.AbstractModule;
public class ProtocolModule extends AbstractModule {
@@ -10,8 +7,5 @@ public class ProtocolModule extends AbstractModule {
@Override
protected void configure() {
bind(BatchFactory.class).to(BatchFactoryImpl.class);
bind(BundleReader.class).to(BundleReaderImpl.class);
bind(BundleWriter.class).to(BundleWriterImpl.class);
bind(HeaderFactory.class).to(HeaderFactoryImpl.class);
}
}