Part 1 of a major BMP and database refactoring. Tests are broken!

The old logic for selecting when to send subscription and transport
updates has been removed and not yet replaced. Subscription times have
been removed from subscription updates. The database expiry time has
been remove from subscription updates and will later get its own update
packet. Transport updates have been broken up into one update per
transport. Acks for subscription and transport updates have been added.
This commit is contained in:
akwizgran
2013-01-25 15:38:37 +00:00
parent b8247968b6
commit 64bf1fbbb1
67 changed files with 1411 additions and 1762 deletions

View File

@@ -15,8 +15,11 @@ import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Transport;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.transport.ContactTransport;
import net.sf.briar.api.transport.TemporarySecret;
@@ -73,7 +76,7 @@ interface Database<T> {
/**
* Adds a new contact to the database and returns an ID for the contact.
* <p>
* Locking: contact write, subscription write, transport write.
* Locking: contact write, subscription write.
*/
ContactId addContact(T txn) throws DbException;
@@ -85,8 +88,8 @@ interface Database<T> {
void addContactTransport(T txn, ContactTransport ct) throws DbException;
/**
* Returns false if the given message is already in the database. Otherwise
* stores the message and returns true.
* Stores the given message, or returns false if the message is already in
* the database.
* <p>
* Locking: message write.
*/
@@ -108,8 +111,8 @@ interface Database<T> {
throws DbException;
/**
* Returns false if the given message is already in the database. Otherwise
* stores the message and returns true.
* Stores the given message, or returns false if the message is already in
* the database.
* <p>
* Locking: contact read, message write.
*/
@@ -132,18 +135,16 @@ interface Database<T> {
void addSubscription(T txn, Group g) throws DbException;
/**
* Records the given contact's subscription to the given group starting at
* the given time.
* Adds a new transport to the database.
* <p>
* Locking: contact read, subscription write.
* Locking: transport write.
*/
void addSubscription(T txn, ContactId c, Group g, long start)
throws DbException;
void addTransport(T txn, TransportId t) throws DbException;
/**
* Makes the given group visible to the given contact.
* <p>
* Locking: contact read, subscription write.
* Locking: contact write, subscription write.
*/
void addVisibility(T txn, ContactId c, GroupId g) throws DbException;
@@ -177,23 +178,13 @@ interface Database<T> {
boolean containsSubscription(T txn, GroupId g) throws DbException;
/**
* Returns true if the user has been subscribed to the given group since
* the given time.
* <p>
* Locking: subscription read.
*/
boolean containsSubscription(T txn, GroupId g, long time)
throws DbException;
/**
* Returns true if the user is subscribed to the given group, the group is
* visible to the given contact, and the subscription has existed since the
* given time.
* Returns true if the user subscribes to the given group and the
* subscription is visible to the given contact.
* <p>
* Locking: contact read, subscription read.
*/
boolean containsVisibleSubscription(T txn, GroupId g, ContactId c,
long time) throws DbException;
boolean containsVisibleSubscription(T txn, ContactId c, GroupId g)
throws DbException;
/**
* Returns the configuration for the given transport.
@@ -249,13 +240,6 @@ interface Database<T> {
TransportProperties getLocalProperties(T txn, TransportId t)
throws DbException;
/**
* Returns all local transports.
* <p>
* Locking: transport read.
*/
Collection<Transport> getLocalTransports(T txn) throws DbException;
/**
* Returns the message identified by the given ID, in serialised form.
* <p>
@@ -402,20 +386,42 @@ interface Database<T> {
Collection<Group> getSubscriptions(T txn, ContactId c) throws DbException;
/**
* Returns the time at which the local transports were last modified.
* Returns a subscription ack for the given contact, or null if no ack is
* due.
* <p>
* Locking: transport read.
* Locking: contact read, subscription write.
*/
long getTransportsModified(T txn) throws DbException;
SubscriptionAck getSubscriptionAck(T txn, ContactId c) throws DbException;
/**
* Returns the time at which a transport update was last sent to the given
* contact.
* Returns a subscription update for the given contact, or null if no
* update is due.
* <p>
* Locking: contact read, transport read.
* Locking: contact read, subscription write.
*/
long getTransportsSent(T txn, ContactId c) throws DbException;
SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c)
throws DbException;
/**
* Returns a collection of transport acks for the given contact, or null if
* no acks are due.
* <p>
* Locking: contact read, transport write.
*/
Collection<TransportAck> getTransportAcks(T txn, ContactId c)
throws DbException;
/**
* Returns a collection of transport updates for the given contact, or
* null if no updates are due.
* <p>
* Locking: contact read, transport write.
*/
Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c)
throws DbException;
/**
* Returns the version number of the
/**
* Returns the number of unread messages in each subscribed group.
* <p>
@@ -430,26 +436,6 @@ interface Database<T> {
*/
Collection<ContactId> getVisibility(T txn, GroupId g) throws DbException;
/**
* Returns any holes covering unsubscriptions that are visible to the given
* contact, occurred strictly before the given timestamp, and have not yet
* been acknowledged.
* <p>
* Locking: contact read, subscription read.
*/
Map<GroupId, GroupId> getVisibleHoles(T txn, ContactId c, long timestamp)
throws DbException;
/**
* Returns any subscriptions that are visible to the given contact,
* occurred strictly before the given timestamp, and have not yet been
* acknowledged.
* <p>
* Locking: contact read, subscription read.
*/
Map<Group, Long> getVisibleSubscriptions(T txn, ContactId c, long timestamp)
throws DbException;
/**
* Returns true if any messages are sendable to the given contact.
* <p>
@@ -523,25 +509,22 @@ interface Database<T> {
* Unsubscribes from the given group. Any messages belonging to the group
* are deleted from the database.
* <p>
* Locking: contact read, message write, messageFlag write,
* Locking: contact write, message write, messageFlag write,
* messageStatus write, subscription write.
*/
void removeSubscription(T txn, GroupId g) throws DbException;
/**
* Removes any subscriptions for the given contact with IDs between the
* given IDs. If both of the given IDs are null, all subscriptions are
* removed. If only the first is null, all subscriptions with IDs less than
* the second ID are removed. If onlt the second is null, all subscriptions
* with IDs greater than the first are removed.
* Removes a transport (and all associated state) from the database.
* <p>
* Locking: contact read, transport write.
*/
void removeSubscriptions(T txn, ContactId c, GroupId start, GroupId end)
throws DbException;
void removeTransport(T txn, TransportId t) throws DbException;
/**
* Makes the given group invisible to the given contact.
* <p>
* Locking: contact read, subscription write.
* Locking: contact write, subscription write.
*/
void removeVisibility(T txn, ContactId c, GroupId g) throws DbException;
@@ -557,7 +540,7 @@ interface Database<T> {
/**
* Sets the given contact's database expiry time.
* <p>
* Locking: contact read, subscription write.
* Locking: contact write.
*/
void setExpiryTime(T txn, ContactId c, long expiry) throws DbException;
@@ -576,6 +559,17 @@ interface Database<T> {
*/
boolean setRead(T txn, MessageId m, boolean read) throws DbException;
/**
* Updates the remote transport properties for the given contact and the
* given transport, replacing any existing properties, unless an update
* with an equal or higher version number has already been received from
* the contact.
* <p>
* Locking: contact read, transport write.
*/
void setRemoteProperties(T txn, ContactId c, TransportUpdate t)
throws DbException;
/**
* Sets the sendability score of the given message.
* <p>
@@ -612,45 +606,30 @@ interface Database<T> {
throws DbException;
/**
* Records the time of the latest subscription update acknowledged by the
* given contact.
* Updates the groups to which the given contact subscribes, unless an
* update with an equal or higher version number has already been received
* from the contact.
* <p>
* Locking: contact read, subscription write.
*/
void setSubscriptionsAcked(T txn, ContactId c, long timestamp)
void setSubscriptions(T txn, ContactId c, SubscriptionUpdate s)
throws DbException;
/**
* Records the time of the latest subscription update received from the
* given contact.
* Records a subscription ack from the given contact for the given version
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: contact read, subscription write.
*/
void setSubscriptionsReceived(T txn, ContactId c, long timestamp)
void setSubscriptionUpdateAcked(T txn, ContactId c, long version)
throws DbException;
/**
* Sets the transports for the given contact, replacing any existing
* transports unless the existing transports have a newer timestamp.
* Records a transport ack from the give contact for the given version
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: contact read, transport write.
*/
void setTransports(T txn, ContactId c, Collection<Transport> transports,
long timestamp) throws DbException;
/**
* Records the time at which the local transports were last modified.
* <p>
* Locking: contact read, transport write.
*/
void setTransportsModified(T txn, long timestamp) throws DbException;
/**
* Records the time at which a transport update was last sent to the given
* contact.
* <p>
* Locking: contact read, transport write.
*/
void setTransportsSent(T txn, ContactId c, long timestamp)
throws DbException;
void setTransportUpdateAcked(T txn, ContactId c, TransportId t,
long version) throws DbException;
}

View File

@@ -5,7 +5,6 @@ import static net.sf.briar.db.DatabaseConstants.BYTES_PER_SWEEP;
import static net.sf.briar.db.DatabaseConstants.CRITICAL_FREE_SPACE;
import static net.sf.briar.db.DatabaseConstants.MAX_BYTES_BETWEEN_SPACE_CHECKS;
import static net.sf.briar.db.DatabaseConstants.MAX_MS_BETWEEN_SPACE_CHECKS;
import static net.sf.briar.db.DatabaseConstants.MAX_UPDATE_INTERVAL;
import static net.sf.briar.db.DatabaseConstants.MIN_FREE_SPACE;
import java.io.IOException;
@@ -17,7 +16,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
@@ -36,12 +34,11 @@ import net.sf.briar.api.db.event.ContactAddedEvent;
import net.sf.briar.api.db.event.ContactRemovedEvent;
import net.sf.briar.api.db.event.DatabaseEvent;
import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent;
import net.sf.briar.api.db.event.MessageAddedEvent;
import net.sf.briar.api.db.event.MessageReceivedEvent;
import net.sf.briar.api.db.event.RatingChangedEvent;
import net.sf.briar.api.db.event.RemoteTransportsUpdatedEvent;
import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent;
import net.sf.briar.api.db.event.TransportsUpdatedEvent;
import net.sf.briar.api.lifecycle.ShutdownManager;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.AuthorId;
@@ -50,10 +47,10 @@ import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.Message;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.Transport;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.transport.ContactTransport;
@@ -98,7 +95,6 @@ DatabaseCleaner.Callback {
private final Database<T> db;
private final DatabaseCleaner cleaner;
private final ShutdownManager shutdown;
private final PacketFactory packetFactory;
private final Clock clock;
private final Collection<DatabaseListener> listeners =
@@ -114,12 +110,10 @@ DatabaseCleaner.Callback {
@Inject
DatabaseComponentImpl(Database<T> db, DatabaseCleaner cleaner,
ShutdownManager shutdown, PacketFactory packetFactory,
Clock clock) {
ShutdownManager shutdown, Clock clock) {
this.db = db;
this.cleaner = cleaner;
this.shutdown = shutdown;
this.packetFactory = packetFactory;
this.clock = clock;
}
@@ -173,23 +167,13 @@ DatabaseCleaner.Callback {
try {
subscriptionLock.writeLock().lock();
try {
transportLock.writeLock().lock();
T txn = db.startTransaction();
try {
windowLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
c = db.addContact(txn);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
windowLock.writeLock().unlock();
}
} finally {
transportLock.writeLock().unlock();
c = db.addContact(txn);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
@@ -243,12 +227,9 @@ DatabaseCleaner.Callback {
T txn = db.startTransaction();
try {
// Don't store the message if the user has
// unsubscribed from the group or the message
// predates the subscription
if(db.containsSubscription(txn, m.getGroup(),
m.getTimestamp())) {
// unsubscribed from the group
if(db.containsSubscription(txn, m.getGroup()))
added = storeGroupMessage(txn, m, null);
}
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -418,11 +399,27 @@ DatabaseCleaner.Callback {
}
}
public void addTransport(TransportId t) throws DbException {
transportLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
db.addTransport(txn, t);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.writeLock().unlock();
}
}
/**
* If the given message is already in the database, returns false.
* Otherwise stores the message and marks it as new or seen with respect to
* the given contact, depending on whether the message is outgoing or
* incoming, respectively.
* incoming, respectively; or returns false if the message is already in
* the database.
* <p>
* Locking: contact read, message write, messageStatus write.
*/
@@ -478,7 +475,7 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
return packetFactory.createAck(acked);
return new Ack(acked);
}
public Collection<byte[]> generateBatch(ContactId c, int maxLength)
@@ -627,80 +624,94 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
return packetFactory.createOffer(offered);
return new Offer(offered);
}
public SubscriptionUpdate generateSubscriptionUpdate(ContactId c)
public SubscriptionAck generateSubscriptionAck(ContactId c)
throws DbException {
Map<GroupId, GroupId> holes;
Map<Group, Long> subs;
long expiry, timestamp;
contactLock.readLock().lock();
try {
subscriptionLock.readLock().lock();
subscriptionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
timestamp = clock.currentTimeMillis() - 1;
holes = db.getVisibleHoles(txn, c, timestamp);
subs = db.getVisibleSubscriptions(txn, c, timestamp);
expiry = db.getExpiryTime(txn);
SubscriptionAck a = db.getSubscriptionAck(txn, c);
db.commitTransaction(txn);
return a;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.readLock().unlock();
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
return packetFactory.createSubscriptionUpdate(holes, subs, expiry,
timestamp);
}
private boolean updateIsDue(long sent) {
long now = clock.currentTimeMillis();
return now - sent >= MAX_UPDATE_INTERVAL;
}
public TransportUpdate generateTransportUpdate(ContactId c)
public SubscriptionUpdate generateSubscriptionUpdate(ContactId c)
throws DbException {
boolean due;
Collection<Transport> transports;
long timestamp;
contactLock.readLock().lock();
try {
transportLock.readLock().lock();
subscriptionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
// Work out whether an update is due
long modified = db.getTransportsModified(txn);
long sent = db.getTransportsSent(txn, c);
due = modified >= sent || updateIsDue(sent);
SubscriptionUpdate s = db.getSubscriptionUpdate(txn, c);
db.commitTransaction(txn);
return s;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
subscriptionLock.writeLock().unlock();
}
if(!due) return null;
} finally {
contactLock.readLock().unlock();
}
}
public Collection<TransportAck> generateTransportAcks(ContactId c)
throws DbException {
contactLock.readLock().lock();
try {
transportLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
transports = db.getLocalTransports(txn);
timestamp = clock.currentTimeMillis();
db.setTransportsSent(txn, c, timestamp);
Collection<TransportAck> acks = db.getTransportAcks(txn, c);
db.commitTransaction(txn);
return acks;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public Collection<TransportUpdate> generateTransportUpdates(ContactId c)
throws DbException {
contactLock.readLock().lock();
try {
transportLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
Collection<TransportUpdate> updates =
db.getTransportUpdates(txn, c);
db.commitTransaction(txn);
return updates;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
@@ -711,7 +722,6 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
return packetFactory.createTransportUpdate(transports, timestamp);
}
public TransportConfig getConfig(TransportId t) throws DbException {
@@ -766,23 +776,6 @@ DatabaseCleaner.Callback {
}
}
public Collection<Transport> getLocalTransports() throws DbException {
transportLock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Collection<Transport> transports = db.getLocalTransports(txn);
db.commitTransaction(txn);
return transports;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
}
}
public Collection<MessageHeader> getMessageHeaders(GroupId g)
throws DbException {
messageLock.readLock().lock();
@@ -1022,7 +1015,6 @@ DatabaseCleaner.Callback {
try {
if(!p.equals(db.getLocalProperties(txn, t))) {
db.mergeLocalProperties(txn, t, p);
db.setTransportsModified(txn, clock.currentTimeMillis());
changed = true;
}
db.commitTransaction(txn);
@@ -1034,7 +1026,7 @@ DatabaseCleaner.Callback {
transportLock.writeLock().unlock();
}
// Call the listeners outside the lock
if(changed) callListeners(new LocalTransportsUpdatedEvent());
if(changed) callListeners(new TransportsUpdatedEvent());
}
public void receiveAck(ContactId c, Ack a) throws DbException {
@@ -1114,8 +1106,7 @@ DatabaseCleaner.Callback {
throws DbException {
GroupId g = m.getGroup();
if(g == null) return storePrivateMessage(txn, m, c, true);
if(!db.containsVisibleSubscription(txn, g, c, m.getTimestamp()))
return false;
if(!db.containsVisibleSubscription(txn, c, g)) return false;
return storeGroupMessage(txn, m, c);
}
@@ -1161,12 +1152,11 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
return packetFactory.createRequest(request, offered.size());
return new Request(request, offered.size());
}
public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate s)
public void receiveSubscriptionAck(ContactId c, SubscriptionAck a)
throws DbException {
// Update the contact's subscriptions
contactLock.readLock().lock();
try {
subscriptionLock.writeLock().lock();
@@ -1175,17 +1165,8 @@ DatabaseCleaner.Callback {
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
Map<GroupId, GroupId> holes = s.getHoles();
for(Entry<GroupId, GroupId> e : holes.entrySet()) {
GroupId start = e.getKey(), end = e.getValue();
db.removeSubscriptions(txn, c, start, end);
}
Map<Group, Long> subs = s.getSubscriptions();
for(Entry<Group, Long> e : subs.entrySet()) {
db.addSubscription(txn, c, e.getKey(), e.getValue());
}
db.setExpiryTime(txn, c, s.getExpiryTime());
db.setSubscriptionsReceived(txn, c, s.getTimestamp());
long version = a.getVersionNumber();
db.setSubscriptionUpdateAcked(txn, c, version);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1197,15 +1178,34 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
callListeners(new SubscriptionsUpdatedEvent(
Collections.singletonList(c)));
}
public void receiveTransportUpdate(ContactId c, TransportUpdate t)
public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate s)
throws DbException {
contactLock.readLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setSubscriptions(txn, c, s);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveTransportAck(ContactId c, TransportAck a)
throws DbException {
Collection<Transport> transports;
// Update the contact's transport properties
contactLock.readLock().lock();
try {
transportLock.writeLock().lock();
@@ -1214,8 +1214,33 @@ DatabaseCleaner.Callback {
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
transports = t.getTransports();
db.setTransports(txn, c, transports, t.getTimestamp());
TransportId t = a.getId();
long version = a.getVersionNumber();
db.setTransportUpdateAcked(txn, c, t, version);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public void receiveTransportUpdate(ContactId c, TransportUpdate t)
throws DbException {
contactLock.readLock().lock();
try {
transportLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if(!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setRemoteProperties(txn, c, t);
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
@@ -1227,8 +1252,6 @@ DatabaseCleaner.Callback {
} finally {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
callListeners(new RemoteTransportsUpdatedEvent(c, transports));
}
public void removeContact(ContactId c) throws DbException {
@@ -1398,8 +1421,7 @@ DatabaseCleaner.Callback {
public void setVisibility(GroupId g, Collection<ContactId> visible)
throws DbException {
List<ContactId> affected = new ArrayList<ContactId>();
contactLock.readLock().lock();
contactLock.writeLock().lock();
try {
subscriptionLock.writeLock().lock();
try {
@@ -1413,13 +1435,8 @@ DatabaseCleaner.Callback {
for(ContactId c : db.getContacts(txn)) {
boolean then = oldVisible.contains(c);
boolean now = visible.contains(c);
if(!then && now) {
db.addVisibility(txn, c, g);
affected.add(c);
} else if(then && !now) {
db.removeVisibility(txn, c, g);
affected.add(c);
}
if(!then && now) db.addVisibility(txn, c, g);
else if(then && !now) db.removeVisibility(txn, c, g);
}
db.commitTransaction(txn);
} catch(DbException e) {
@@ -1430,12 +1447,7 @@ DatabaseCleaner.Callback {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
// Call the listeners outside the lock
if(!affected.isEmpty()) {
affected = Collections.unmodifiableList(affected);
callListeners(new SubscriptionsUpdatedEvent(affected));
contactLock.writeLock().unlock();
}
}
@@ -1459,7 +1471,7 @@ DatabaseCleaner.Callback {
public void unsubscribe(GroupId g) throws DbException {
Collection<ContactId> affected = null;
contactLock.readLock().lock();
contactLock.writeLock().lock();
try {
messageLock.writeLock().lock();
try {
@@ -1493,7 +1505,7 @@ DatabaseCleaner.Callback {
messageLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
contactLock.writeLock().unlock();
}
// Call the listeners outside the lock
if(affected != null && !affected.isEmpty())

View File

@@ -8,7 +8,6 @@ import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DatabaseConfig;
import net.sf.briar.api.db.DatabaseExecutor;
import net.sf.briar.api.lifecycle.ShutdownManager;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.util.BoundedExecutor;
import com.google.inject.AbstractModule;
@@ -41,15 +40,14 @@ public class DatabaseModule extends AbstractModule {
}
@Provides
Database<Connection> getDatabase(Clock clock, DatabaseConfig config) {
return new H2Database(clock, config);
Database<Connection> getDatabase(DatabaseConfig config) {
return new H2Database(config);
}
@Provides @Singleton
DatabaseComponent getDatabaseComponent(Database<Connection> db,
DatabaseCleaner cleaner, ShutdownManager shutdown,
PacketFactory packetFactory, Clock clock) {
DatabaseCleaner cleaner, ShutdownManager shutdown, Clock clock) {
return new DatabaseComponentImpl<Connection>(db, cleaner, shutdown,
packetFactory, clock);
clock);
}
}

View File

@@ -30,8 +30,8 @@ class H2Database extends JdbcDatabase {
private final long maxSize;
@Inject
H2Database(Clock clock, DatabaseConfig config) {
super(clock, HASH_TYPE, BINARY_TYPE, COUNTER_TYPE, SECRET_TYPE);
H2Database(DatabaseConfig config) {
super(HASH_TYPE, BINARY_TYPE, COUNTER_TYPE, SECRET_TYPE);
home = new File(config.getDataDirectory(), "db");
url = "jdbc:h2:split:" + home.getPath()
+ ";CIPHER=AES;DB_CLOSE_ON_EXIT=false";

File diff suppressed because it is too large Load Diff

View File

@@ -104,7 +104,7 @@ abstract class Connector extends Thread {
protected byte[] receivePublicKeyHash(Reader r) throws IOException {
byte[] b = r.readBytes(HASH_LENGTH);
if(b.length != HASH_LENGTH) throw new FormatException();
if(b.length < HASH_LENGTH) throw new FormatException();
if(LOG.isLoggable(INFO)) LOG.info(pluginName + " received hash");
return b;
}

View File

@@ -1,19 +0,0 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.MessageId;
class AckImpl implements Ack {
private final Collection<MessageId> acked;
AckImpl(Collection<MessageId> acked) {
this.acked = acked;
}
public Collection<MessageId> getMessageIds() {
return acked;
}
}

View File

@@ -12,7 +12,6 @@ import net.sf.briar.api.Bytes;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.UniqueId;
import net.sf.briar.api.serial.Consumer;
import net.sf.briar.api.serial.CountingConsumer;
@@ -21,18 +20,11 @@ import net.sf.briar.api.serial.StructReader;
class AckReader implements StructReader<Ack> {
private final PacketFactory packetFactory;
AckReader(PacketFactory packetFactory) {
this.packetFactory = packetFactory;
}
public Ack readStruct(Reader r) throws IOException {
// Initialise the consumer
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
// Read the data
r.addConsumer(counting);
r.readStructId(ACK);
// Read the message IDs as byte arrays
r.setMaxBytesLength(UniqueId.LENGTH);
List<Bytes> raw = r.readList(Bytes.class);
r.resetMaxBytesLength();
@@ -46,6 +38,6 @@ class AckReader implements StructReader<Ack> {
acked.add(new MessageId(b.getBytes()));
}
// Build and return the ack
return packetFactory.createAck(Collections.unmodifiableList(acked));
return new Ack(Collections.unmodifiableList(acked));
}
}

View File

@@ -27,7 +27,7 @@ class AuthorFactoryImpl implements AuthorFactory {
}
public Author createAuthor(String name, byte[] publicKey)
throws IOException {
throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Writer w = writerFactory.createWriter(out);
w.writeStructId(AUTHOR);
@@ -36,10 +36,6 @@ class AuthorFactoryImpl implements AuthorFactory {
MessageDigest messageDigest = crypto.getMessageDigest();
messageDigest.update(out.toByteArray());
AuthorId id = new AuthorId(messageDigest.digest());
return new AuthorImpl(id, name, publicKey);
}
public Author createAuthor(AuthorId id, String name, byte[] publicKey) {
return new AuthorImpl(id, name, publicKey);
return new Author(id, name, publicKey);
}
}

View File

@@ -1,29 +0,0 @@
package net.sf.briar.protocol;
import net.sf.briar.api.protocol.Author;
import net.sf.briar.api.protocol.AuthorId;
class AuthorImpl implements Author {
private final AuthorId id;
private final String name;
private final byte[] publicKey;
AuthorImpl(AuthorId id, String name, byte[] publicKey) {
this.id = id;
this.name = name;
this.publicKey = publicKey;
}
public AuthorId getId() {
return id;
}
public String getName() {
return name;
}
public byte[] getPublicKey() {
return publicKey;
}
}

View File

@@ -9,7 +9,6 @@ import java.io.IOException;
import net.sf.briar.api.crypto.CryptoComponent;
import net.sf.briar.api.crypto.MessageDigest;
import net.sf.briar.api.protocol.Author;
import net.sf.briar.api.protocol.AuthorFactory;
import net.sf.briar.api.protocol.AuthorId;
import net.sf.briar.api.serial.DigestingConsumer;
import net.sf.briar.api.serial.Reader;
@@ -18,15 +17,12 @@ import net.sf.briar.api.serial.StructReader;
class AuthorReader implements StructReader<Author> {
private final MessageDigest messageDigest;
private final AuthorFactory authorFactory;
AuthorReader(CryptoComponent crypto, AuthorFactory authorFactory) {
AuthorReader(CryptoComponent crypto) {
messageDigest = crypto.getMessageDigest();
this.authorFactory = authorFactory;
}
public Author readStruct(Reader r) throws IOException {
// Initialise the consumer
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
// Read and digest the data
r.addConsumer(digesting);
@@ -36,6 +32,6 @@ class AuthorReader implements StructReader<Author> {
r.removeConsumer(digesting);
// Build and return the author
AuthorId id = new AuthorId(messageDigest.digest());
return authorFactory.createAuthor(id, name, publicKey);
return new Author(id, name, publicKey);
}
}

View File

@@ -23,7 +23,6 @@ class GroupReader implements StructReader<Group> {
}
public Group readStruct(Reader r) throws IOException {
// Initialise the consumer
DigestingConsumer digesting = new DigestingConsumer(messageDigest);
// Read and digest the data
r.addConsumer(digesting);

View File

@@ -46,7 +46,7 @@ class MessageReader implements StructReader<UnverifiedMessage> {
r.readNull();
} else {
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length != UniqueId.LENGTH) throw new FormatException();
if(b.length < UniqueId.LENGTH) throw new FormatException();
parent = new MessageId(b);
}
// Read the group, if there is one
@@ -74,7 +74,7 @@ class MessageReader implements StructReader<UnverifiedMessage> {
if(timestamp < 0L) throw new FormatException();
// Read the salt
byte[] salt = r.readBytes(SALT_LENGTH);
if(salt.length != SALT_LENGTH) throw new FormatException();
if(salt.length < SALT_LENGTH) throw new FormatException();
// Read the message body
byte[] body = r.readBytes(MAX_BODY_LENGTH);
// Record the offset of the body within the message

View File

@@ -1,19 +0,0 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
class OfferImpl implements Offer {
private final Collection<MessageId> offered;
OfferImpl(Collection<MessageId> offered) {
this.offered = offered;
}
public Collection<MessageId> getMessageIds() {
return offered;
}
}

View File

@@ -12,7 +12,6 @@ import net.sf.briar.api.Bytes;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.UniqueId;
import net.sf.briar.api.serial.Consumer;
import net.sf.briar.api.serial.CountingConsumer;
@@ -21,18 +20,11 @@ import net.sf.briar.api.serial.StructReader;
class OfferReader implements StructReader<Offer> {
private final PacketFactory packetFactory;
OfferReader(PacketFactory packetFactory) {
this.packetFactory = packetFactory;
}
public Offer readStruct(Reader r) throws IOException {
// Initialise the consumer
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
// Read the data
r.addConsumer(counting);
r.readStructId(OFFER);
// Read the message IDs as byte arrays
r.setMaxBytesLength(UniqueId.LENGTH);
List<Bytes> raw = r.readList(Bytes.class);
r.resetMaxBytesLength();
@@ -46,7 +38,6 @@ class OfferReader implements StructReader<Offer> {
messages.add(new MessageId(b.getBytes()));
}
// Build and return the offer
return packetFactory.createOffer(Collections.unmodifiableList(
messages));
return new Offer(Collections.unmodifiableList(messages));
}
}

View File

@@ -1,42 +0,0 @@
package net.sf.briar.protocol;
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.Transport;
import net.sf.briar.api.protocol.TransportUpdate;
class PacketFactoryImpl implements PacketFactory {
public Ack createAck(Collection<MessageId> acked) {
return new AckImpl(acked);
}
public Offer createOffer(Collection<MessageId> offered) {
return new OfferImpl(offered);
}
public Request createRequest(BitSet requested, int length) {
return new RequestImpl(requested, length);
}
public SubscriptionUpdate createSubscriptionUpdate(
Map<GroupId, GroupId> holes, Map<Group, Long> subs, long expiry,
long timestamp) {
return new SubscriptionUpdateImpl(holes, subs, expiry, timestamp);
}
public TransportUpdate createTransportUpdate(
Collection<Transport> transports, long timestamp) {
return new TransportUpdateImpl(transports, timestamp);
}
}

View File

@@ -11,11 +11,12 @@ import net.sf.briar.api.protocol.GroupFactory;
import net.sf.briar.api.protocol.MessageFactory;
import net.sf.briar.api.protocol.MessageVerifier;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.ProtocolWriterFactory;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedMessage;
import net.sf.briar.api.protocol.VerificationExecutor;
@@ -48,7 +49,6 @@ public class ProtocolModule extends AbstractModule {
bind(GroupFactory.class).to(GroupFactoryImpl.class);
bind(MessageFactory.class).to(MessageFactoryImpl.class);
bind(MessageVerifier.class).to(MessageVerifierImpl.class);
bind(PacketFactory.class).to(PacketFactoryImpl.class);
bind(ProtocolReaderFactory.class).to(ProtocolReaderFactoryImpl.class);
bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class);
// The executor is bounded, so tasks must be independent and short-lived
@@ -59,14 +59,13 @@ public class ProtocolModule extends AbstractModule {
}
@Provides
StructReader<Ack> getAckReader(PacketFactory ackFactory) {
return new AckReader(ackFactory);
StructReader<Ack> getAckReader() {
return new AckReader();
}
@Provides
StructReader<Author> getAuthorReader(CryptoComponent crypto,
AuthorFactory authorFactory) {
return new AuthorReader(crypto, authorFactory);
StructReader<Author> getAuthorReader(CryptoComponent crypto) {
return new AuthorReader(crypto);
}
@Provides
@@ -82,24 +81,33 @@ public class ProtocolModule extends AbstractModule {
}
@Provides
StructReader<Offer> getOfferReader(PacketFactory packetFactory) {
return new OfferReader(packetFactory);
StructReader<Offer> getOfferReader() {
return new OfferReader();
}
@Provides
StructReader<Request> getRequestReader(PacketFactory packetFactory) {
return new RequestReader(packetFactory);
StructReader<Request> getRequestReader() {
return new RequestReader();
}
@Provides
StructReader<SubscriptionUpdate> getSubscriptionReader(
StructReader<Group> groupReader, PacketFactory packetFactory) {
return new SubscriptionUpdateReader(groupReader, packetFactory);
StructReader<SubscriptionAck> getSubscriptionAckReader() {
return new SubscriptionAckReader();
}
@Provides
StructReader<TransportUpdate> getTransportReader(
PacketFactory packetFactory) {
return new TransportUpdateReader(packetFactory);
StructReader<SubscriptionUpdate> getSubscriptionUpdateReader(
StructReader<Group> groupReader) {
return new SubscriptionUpdateReader(groupReader);
}
@Provides
StructReader<TransportAck> getTransportAckReader() {
return new TransportAckReader();
}
@Provides
StructReader<TransportUpdate> getTransportUpdateReader() {
return new TransportUpdateReader();
}
}

View File

@@ -7,7 +7,9 @@ import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolReader;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedMessage;
import net.sf.briar.api.serial.ReaderFactory;
@@ -16,6 +18,7 @@ import net.sf.briar.api.serial.StructReader;
import com.google.inject.Inject;
import com.google.inject.Provider;
// FIXME: Refactor this package to reduce boilerplate
class ProtocolReaderFactoryImpl implements ProtocolReaderFactory {
private final ReaderFactory readerFactory;
@@ -23,8 +26,10 @@ class ProtocolReaderFactoryImpl implements ProtocolReaderFactory {
private final Provider<StructReader<UnverifiedMessage>> messageProvider;
private final Provider<StructReader<Offer>> offerProvider;
private final Provider<StructReader<Request>> requestProvider;
private final Provider<StructReader<SubscriptionUpdate>> subscriptionProvider;
private final Provider<StructReader<TransportUpdate>> transportProvider;
private final Provider<StructReader<SubscriptionAck>> subscriptionAckProvider;
private final Provider<StructReader<SubscriptionUpdate>> subscriptionUpdateProvider;
private final Provider<StructReader<TransportAck>> transportAckProvider;
private final Provider<StructReader<TransportUpdate>> transportUpdateProvider;
@Inject
ProtocolReaderFactoryImpl(ReaderFactory readerFactory,
@@ -32,21 +37,26 @@ class ProtocolReaderFactoryImpl implements ProtocolReaderFactory {
Provider<StructReader<UnverifiedMessage>> messageProvider,
Provider<StructReader<Offer>> offerProvider,
Provider<StructReader<Request>> requestProvider,
Provider<StructReader<SubscriptionUpdate>> subscriptionProvider,
Provider<StructReader<TransportUpdate>> transportProvider) {
Provider<StructReader<SubscriptionAck>> subscriptionAckProvider,
Provider<StructReader<SubscriptionUpdate>> subscriptionUpdateProvider,
Provider<StructReader<TransportAck>> transportAckProvider,
Provider<StructReader<TransportUpdate>> transportUpdateProvider) {
this.readerFactory = readerFactory;
this.ackProvider = ackProvider;
this.messageProvider = messageProvider;
this.offerProvider = offerProvider;
this.requestProvider = requestProvider;
this.subscriptionProvider = subscriptionProvider;
this.transportProvider = transportProvider;
this.subscriptionAckProvider = subscriptionAckProvider;
this.subscriptionUpdateProvider = subscriptionUpdateProvider;
this.transportAckProvider = transportAckProvider;
this.transportUpdateProvider = transportUpdateProvider;
}
public ProtocolReader createProtocolReader(InputStream in) {
return new ProtocolReaderImpl(in, readerFactory, ackProvider.get(),
messageProvider.get(), offerProvider.get(),
requestProvider.get(), subscriptionProvider.get(),
transportProvider.get());
requestProvider.get(), subscriptionAckProvider.get(),
subscriptionUpdateProvider.get(), transportAckProvider.get(),
transportUpdateProvider.get());
}
}

View File

@@ -4,7 +4,9 @@ import static net.sf.briar.api.protocol.Types.ACK;
import static net.sf.briar.api.protocol.Types.MESSAGE;
import static net.sf.briar.api.protocol.Types.OFFER;
import static net.sf.briar.api.protocol.Types.REQUEST;
import static net.sf.briar.api.protocol.Types.SUBSCRIPTION_ACK;
import static net.sf.briar.api.protocol.Types.SUBSCRIPTION_UPDATE;
import static net.sf.briar.api.protocol.Types.TRANSPORT_ACK;
import static net.sf.briar.api.protocol.Types.TRANSPORT_UPDATE;
import java.io.IOException;
@@ -14,7 +16,9 @@ import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolReader;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedMessage;
import net.sf.briar.api.serial.Reader;
@@ -30,15 +34,19 @@ class ProtocolReaderImpl implements ProtocolReader {
StructReader<UnverifiedMessage> messageReader,
StructReader<Offer> offerReader,
StructReader<Request> requestReader,
StructReader<SubscriptionUpdate> subscriptionReader,
StructReader<TransportUpdate> transportReader) {
StructReader<SubscriptionAck> subscriptionAckReader,
StructReader<SubscriptionUpdate> subscriptionUpdateReader,
StructReader<TransportAck> transportAckReader,
StructReader<TransportUpdate> transportUpdateReader) {
reader = readerFactory.createReader(in);
reader.addStructReader(ACK, ackReader);
reader.addStructReader(MESSAGE, messageReader);
reader.addStructReader(OFFER, offerReader);
reader.addStructReader(REQUEST, requestReader);
reader.addStructReader(SUBSCRIPTION_UPDATE, subscriptionReader);
reader.addStructReader(TRANSPORT_UPDATE, transportReader);
reader.addStructReader(SUBSCRIPTION_ACK, subscriptionAckReader);
reader.addStructReader(SUBSCRIPTION_UPDATE, subscriptionUpdateReader);
reader.addStructReader(TRANSPORT_ACK, transportAckReader);
reader.addStructReader(TRANSPORT_UPDATE, transportUpdateReader);
}
public boolean eof() throws IOException {
@@ -77,6 +85,14 @@ class ProtocolReaderImpl implements ProtocolReader {
return reader.readStruct(REQUEST, Request.class);
}
public boolean hasSubscriptionAck() throws IOException {
return reader.hasStruct(SUBSCRIPTION_ACK);
}
public SubscriptionAck readSubscriptionAck() throws IOException {
return reader.readStruct(SUBSCRIPTION_ACK, SubscriptionAck.class);
}
public boolean hasSubscriptionUpdate() throws IOException {
return reader.hasStruct(SUBSCRIPTION_UPDATE);
}
@@ -86,6 +102,14 @@ class ProtocolReaderImpl implements ProtocolReader {
SubscriptionUpdate.class);
}
public boolean hasTransportAck() throws IOException {
return reader.hasStruct(TRANSPORT_ACK);
}
public TransportAck readTransportAck() throws IOException {
return reader.readStruct(TRANSPORT_ACK, TransportAck.class);
}
public boolean hasTransportUpdate() throws IOException {
return reader.hasStruct(TRANSPORT_UPDATE);
}

View File

@@ -5,24 +5,24 @@ import static net.sf.briar.api.protocol.Types.ACK;
import static net.sf.briar.api.protocol.Types.GROUP;
import static net.sf.briar.api.protocol.Types.OFFER;
import static net.sf.briar.api.protocol.Types.REQUEST;
import static net.sf.briar.api.protocol.Types.SUBSCRIPTION_ACK;
import static net.sf.briar.api.protocol.Types.SUBSCRIPTION_UPDATE;
import static net.sf.briar.api.protocol.Types.TRANSPORT;
import static net.sf.briar.api.protocol.Types.TRANSPORT_ACK;
import static net.sf.briar.api.protocol.Types.TRANSPORT_UPDATE;
import java.io.IOException;
import java.io.OutputStream;
import java.util.BitSet;
import java.util.Map.Entry;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolWriter;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.Transport;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.serial.SerialComponent;
import net.sf.briar.api.serial.Writer;
@@ -103,48 +103,40 @@ class ProtocolWriterImpl implements ProtocolWriter {
if(flush) out.flush();
}
public void writeSubscriptionUpdate(SubscriptionUpdate s)
throws IOException {
w.writeStructId(SUBSCRIPTION_UPDATE);
// Holes
w.writeMapStart();
for(Entry<GroupId, GroupId> e : s.getHoles().entrySet()) {
w.writeBytes(e.getKey().getBytes());
w.writeBytes(e.getValue().getBytes());
}
w.writeMapEnd();
// Subscriptions
w.writeMapStart();
for(Entry<Group, Long> e : s.getSubscriptions().entrySet()) {
writeGroup(w, e.getKey());
w.writeInt64(e.getValue());
}
w.writeMapEnd();
// Expiry time
w.writeInt64(s.getExpiryTime());
// Timestamp
w.writeInt64(s.getTimestamp());
public void writeSubscriptionAck(SubscriptionAck a) throws IOException {
w.writeStructId(SUBSCRIPTION_ACK);
w.writeInt64(a.getVersionNumber());
if(flush) out.flush();
}
private void writeGroup(Writer w, Group g) throws IOException {
w.writeStructId(GROUP);
w.writeString(g.getName());
byte[] publicKey = g.getPublicKey();
if(publicKey == null) w.writeNull();
else w.writeBytes(publicKey);
public void writeSubscriptionUpdate(SubscriptionUpdate s)
throws IOException {
w.writeStructId(SUBSCRIPTION_UPDATE);
w.writeListStart();
for(Group g : s.getGroups()) {
w.writeStructId(GROUP);
w.writeString(g.getName());
byte[] publicKey = g.getPublicKey();
if(publicKey == null) w.writeNull();
else w.writeBytes(publicKey);
}
w.writeListEnd();
w.writeInt64(s.getVersionNumber());
if(flush) out.flush();
}
public void writeTransportAck(TransportAck a) throws IOException {
w.writeStructId(TRANSPORT_ACK);
w.writeBytes(a.getId().getBytes());
w.writeInt64(a.getVersionNumber());
if(flush) out.flush();
}
public void writeTransportUpdate(TransportUpdate t) throws IOException {
w.writeStructId(TRANSPORT_UPDATE);
w.writeListStart();
for(Transport p : t.getTransports()) {
w.writeStructId(TRANSPORT);
w.writeBytes(p.getId().getBytes());
w.writeMap(p.getProperties());
}
w.writeListEnd();
w.writeInt64(t.getTimestamp());
w.writeBytes(t.getId().getBytes());
w.writeMap(t.getProperties());
w.writeInt64(t.getVersionNumber());
if(flush) out.flush();
}

View File

@@ -1,24 +0,0 @@
package net.sf.briar.protocol;
import java.util.BitSet;
import net.sf.briar.api.protocol.Request;
class RequestImpl implements Request {
private final BitSet requested;
private final int length;
RequestImpl(BitSet requested, int length) {
this.requested = requested;
this.length = length;
}
public BitSet getBitmap() {
return requested;
}
public int getLength() {
return length;
}
}

View File

@@ -7,7 +7,6 @@ import java.io.IOException;
import java.util.BitSet;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.serial.Consumer;
import net.sf.briar.api.serial.CountingConsumer;
@@ -16,20 +15,14 @@ import net.sf.briar.api.serial.StructReader;
class RequestReader implements StructReader<Request> {
private final PacketFactory packetFactory;
RequestReader(PacketFactory packetFactory) {
this.packetFactory = packetFactory;
}
public Request readStruct(Reader r) throws IOException {
// Initialise the consumer
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
// Read the data
r.addConsumer(counting);
r.readStructId(REQUEST);
// There may be up to 7 bits of padding at the end of the bitmap
int padding = r.readUint7();
if(padding > 7) throw new FormatException();
// Read the bitmap
byte[] bitmap = r.readBytes(MAX_PACKET_LENGTH);
r.removeConsumer(counting);
// Convert the bitmap into a BitSet
@@ -41,6 +34,6 @@ class RequestReader implements StructReader<Request> {
if((bitmap[i] & bit) != 0) b.set(i * 8 + j);
}
}
return packetFactory.createRequest(b, length);
return new Request(b, length);
}
}

View File

@@ -0,0 +1,20 @@
package net.sf.briar.protocol;
import static net.sf.briar.api.protocol.Types.SUBSCRIPTION_ACK;
import java.io.IOException;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.SubscriptionAck;
import net.sf.briar.api.serial.Reader;
import net.sf.briar.api.serial.StructReader;
class SubscriptionAckReader implements StructReader<SubscriptionAck> {
public SubscriptionAck readStruct(Reader r) throws IOException {
r.readStructId(SUBSCRIPTION_ACK);
long version = r.readInt64();
if(version < 0L) throw new FormatException();
return new SubscriptionAck(version);
}
}

View File

@@ -1,38 +0,0 @@
package net.sf.briar.protocol;
import java.util.Map;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.SubscriptionUpdate;
class SubscriptionUpdateImpl implements SubscriptionUpdate {
private final Map<GroupId, GroupId> holes;
private final Map<Group, Long> subs;
private final long expiry, timestamp;
SubscriptionUpdateImpl(Map<GroupId, GroupId> holes, Map<Group, Long> subs,
long expiry, long timestamp) {
this.holes = holes;
this.subs = subs;
this.expiry = expiry;
this.timestamp = timestamp;
}
public Map<GroupId, GroupId> getHoles() {
return holes;
}
public Map<Group, Long> getSubscriptions() {
return subs;
}
public long getExpiryTime() {
return expiry;
}
public long getTimestamp() {
return timestamp;
}
}

View File

@@ -5,15 +5,12 @@ import static net.sf.briar.api.protocol.Types.GROUP;
import static net.sf.briar.api.protocol.Types.SUBSCRIPTION_UPDATE;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
import java.util.List;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.Group;
import net.sf.briar.api.protocol.GroupId;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.UniqueId;
import net.sf.briar.api.serial.Consumer;
import net.sf.briar.api.serial.CountingConsumer;
import net.sf.briar.api.serial.Reader;
@@ -22,46 +19,25 @@ import net.sf.briar.api.serial.StructReader;
class SubscriptionUpdateReader implements StructReader<SubscriptionUpdate> {
private final StructReader<Group> groupReader;
private final PacketFactory packetFactory;
SubscriptionUpdateReader(StructReader<Group> groupReader,
PacketFactory packetFactory) {
SubscriptionUpdateReader(StructReader<Group> groupReader) {
this.groupReader = groupReader;
this.packetFactory = packetFactory;
}
public SubscriptionUpdate readStruct(Reader r) throws IOException {
// Initialise the consumer
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
// Read the data
r.addConsumer(counting);
r.readStructId(SUBSCRIPTION_UPDATE);
// Holes
Map<GroupId, GroupId> holes = new HashMap<GroupId, GroupId>();
r.setMaxBytesLength(UniqueId.LENGTH);
r.readMapStart();
while(!r.hasMapEnd()) {
byte[] start = r.readBytes();
if(start.length != UniqueId.LENGTH) throw new FormatException();
byte[] end = r.readBytes();
if(end.length != UniqueId.LENGTH)throw new FormatException();
holes.put(new GroupId(start), new GroupId(end));
}
r.readMapEnd();
r.resetMaxBytesLength();
// Subscriptions
// Read the subscriptions
r.addStructReader(GROUP, groupReader);
Map<Group, Long> subs = r.readMap(Group.class, Long.class);
List<Group> subs = r.readList(Group.class);
r.removeStructReader(GROUP);
// Expiry time
long expiry = r.readInt64();
if(expiry < 0L) throw new FormatException();
// Timestamp
long timestamp = r.readInt64();
if(timestamp < 0L) throw new FormatException();
// Read the version number
long version = r.readInt64();
if(version < 0L) throw new FormatException();
r.removeConsumer(counting);
// Build and return the subscription update
return packetFactory.createSubscriptionUpdate(holes, subs, expiry,
timestamp);
subs = Collections.unmodifiableList(subs);
return new SubscriptionUpdate(subs, version);
}
}

View File

@@ -0,0 +1,24 @@
package net.sf.briar.protocol;
import static net.sf.briar.api.protocol.Types.TRANSPORT_ACK;
import java.io.IOException;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.TransportAck;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.UniqueId;
import net.sf.briar.api.serial.Reader;
import net.sf.briar.api.serial.StructReader;
class TransportAckReader implements StructReader<TransportAck> {
public TransportAck readStruct(Reader r) throws IOException {
r.readStructId(TRANSPORT_ACK);
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length < UniqueId.LENGTH) throw new FormatException();
long version = r.readInt64();
if(version < 0L) throw new FormatException();
return new TransportAck(new TransportId(b), version);
}
}

View File

@@ -1,26 +0,0 @@
package net.sf.briar.protocol;
import java.util.Collection;
import net.sf.briar.api.protocol.Transport;
import net.sf.briar.api.protocol.TransportUpdate;
class TransportUpdateImpl implements TransportUpdate {
private final Collection<Transport> transports;
private final long timestamp;
TransportUpdateImpl(Collection<Transport> transports,
long timestamp) {
this.transports = transports;
this.timestamp = timestamp;
}
public Collection<Transport> getTransports() {
return transports;
}
public long getTimestamp() {
return timestamp;
}
}

View File

@@ -3,19 +3,13 @@ package net.sf.briar.protocol;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PROPERTIES_PER_TRANSPORT;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PROPERTY_LENGTH;
import static net.sf.briar.api.protocol.ProtocolConstants.MAX_TRANSPORTS;
import static net.sf.briar.api.protocol.Types.TRANSPORT;
import static net.sf.briar.api.protocol.Types.TRANSPORT_UPDATE;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.protocol.PacketFactory;
import net.sf.briar.api.protocol.Transport;
import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.protocol.TransportId;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UniqueId;
@@ -26,50 +20,25 @@ import net.sf.briar.api.serial.StructReader;
class TransportUpdateReader implements StructReader<TransportUpdate> {
private final PacketFactory packetFactory;
private final StructReader<Transport> transportReader;
TransportUpdateReader(PacketFactory packetFactory) {
this.packetFactory = packetFactory;
transportReader = new TransportReader();
}
public TransportUpdate readStruct(Reader r) throws IOException {
// Initialise the consumer
Consumer counting = new CountingConsumer(MAX_PACKET_LENGTH);
// Read the data
r.addConsumer(counting);
r.readStructId(TRANSPORT_UPDATE);
r.addStructReader(TRANSPORT, transportReader);
Collection<Transport> transports = r.readList(Transport.class);
r.removeStructReader(TRANSPORT);
if(transports.size() > MAX_TRANSPORTS) throw new FormatException();
long timestamp = r.readInt64();
// Read the transport ID
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length < UniqueId.LENGTH) throw new FormatException();
TransportId id = new TransportId(b);
// Read the transport properties
r.setMaxStringLength(MAX_PROPERTY_LENGTH);
Map<String, String> m = r.readMap(String.class, String.class);
r.resetMaxStringLength();
if(m.size() > MAX_PROPERTIES_PER_TRANSPORT)
throw new FormatException();
// Read the version number
long version = r.readInt64();
if(version < 0L) throw new FormatException();
r.removeConsumer(counting);
// Check for duplicate IDs
Set<TransportId> ids = new HashSet<TransportId>();
for(Transport t : transports) {
if(!ids.add(t.getId())) throw new FormatException();
}
// Build and return the transport update
return packetFactory.createTransportUpdate(transports, timestamp);
}
private static class TransportReader implements StructReader<Transport> {
public Transport readStruct(Reader r) throws IOException {
r.readStructId(TRANSPORT);
// Read the ID
byte[] b = r.readBytes(UniqueId.LENGTH);
if(b.length != UniqueId.LENGTH) throw new FormatException();
TransportId id = new TransportId(b);
// Read the properties
r.setMaxStringLength(MAX_PROPERTY_LENGTH);
Map<String, String> m = r.readMap(String.class, String.class);
r.resetMaxStringLength();
if(m.size() > MAX_PROPERTIES_PER_TRANSPORT)
throw new FormatException();
return new Transport(id, m);
}
return new TransportUpdate(id, new TransportProperties(m), version);
}
}

View File

@@ -27,10 +27,10 @@ import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.event.ContactRemovedEvent;
import net.sf.briar.api.db.event.DatabaseEvent;
import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent;
import net.sf.briar.api.db.event.MessageAddedEvent;
import net.sf.briar.api.db.event.MessageReceivedEvent;
import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent;
import net.sf.briar.api.db.event.TransportsUpdatedEvent;
import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.Message;
@@ -55,6 +55,7 @@ import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.util.ByteUtils;
// FIXME: Read and write subscription and transport acks
abstract class DuplexConnection implements DatabaseListener {
private static final Logger LOG =
@@ -132,7 +133,7 @@ abstract class DuplexConnection implements DatabaseListener {
if(affected.contains(contactId)) {
dbExecutor.execute(new GenerateSubscriptionUpdate());
}
} else if(e instanceof LocalTransportsUpdatedEvent) {
} else if(e instanceof TransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportUpdate());
}
}
@@ -556,8 +557,9 @@ abstract class DuplexConnection implements DatabaseListener {
public void run() {
try {
TransportUpdate t = db.generateTransportUpdate(contactId);
if(t != null) writerTasks.add(new WriteTransportUpdate(t));
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId);
if(t != null) writerTasks.add(new WriteTransportUpdates(t));
} catch(DbException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
}
@@ -565,18 +567,18 @@ abstract class DuplexConnection implements DatabaseListener {
}
// This task runs on the writer thread
private class WriteTransportUpdate implements Runnable {
private class WriteTransportUpdates implements Runnable {
private final TransportUpdate update;
private final Collection<TransportUpdate> updates;
private WriteTransportUpdate(TransportUpdate update) {
this.update = update;
private WriteTransportUpdates(Collection<TransportUpdate> updates) {
this.updates = updates;
}
public void run() {
assert writer != null;
try {
writer.writeTransportUpdate(update);
for(TransportUpdate t : updates) writer.writeTransportUpdate(t);
} catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
dispose(true, true);

View File

@@ -30,6 +30,7 @@ import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionRegistry;
import net.sf.briar.util.ByteUtils;
// FIXME: Read subscription and transport acks
class IncomingSimplexConnection {
private static final Logger LOG =

View File

@@ -25,6 +25,7 @@ import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.util.ByteUtils;
// FIXME: Write subscription and transport acks
class OutgoingSimplexConnection {
private static final Logger LOG =
@@ -66,15 +67,15 @@ class OutgoingSimplexConnection {
// There should be enough space for a packet
long capacity = conn.getRemainingCapacity();
if(capacity < MAX_PACKET_LENGTH) throw new EOFException();
// Write a transport update
TransportUpdate t = db.generateTransportUpdate(contactId);
if(t != null) writer.writeTransportUpdate(t);
// If there's space, write a subscription update
capacity = conn.getRemainingCapacity();
if(capacity >= MAX_PACKET_LENGTH) {
SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId);
if(s != null) writer.writeSubscriptionUpdate(s);
// Write transport updates. FIXME: Check for space
Collection<TransportUpdate> updates =
db.generateTransportUpdates(contactId);
if(updates != null) {
for(TransportUpdate t : updates) writer.writeTransportUpdate(t);
}
// Write a subscription update. FIXME: Check for space
SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId);
if(s != null) writer.writeSubscriptionUpdate(s);
// Write acks until you can't write acks no more
capacity = conn.getRemainingCapacity();
int maxMessages = writer.getMaxMessagesForAck(capacity);

View File

@@ -98,7 +98,7 @@ class ReaderImpl implements Reader {
if(!consumers.remove(c)) throw new IllegalArgumentException();
}
public void addStructReader(int id, StructReader<?> o) {
public void addStructReader(int id, StructReader<?> r) {
if(id < 0 || id > 255) throw new IllegalArgumentException();
if(structReaders.length < id + 1) {
int len = Math.min(256, Math.max(id + 1, structReaders.length * 2));
@@ -107,7 +107,7 @@ class ReaderImpl implements Reader {
structReaders.length);
structReaders = newStructReaders;
}
structReaders[id] = o;
structReaders[id] = r;
}
public void removeStructReader(int id) {