Removed TransportUpdate and TransportAck.

This commit is contained in:
akwizgran
2016-01-21 18:18:59 +00:00
parent ea02caf577
commit cd175fd119
20 changed files with 20 additions and 1632 deletions

View File

@@ -2,7 +2,6 @@ package org.briarproject.db;
import org.briarproject.api.Settings;
import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.DbException;
@@ -19,8 +18,6 @@ import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.MessageStatus;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.transport.TransportKeys;
import java.io.IOException;
@@ -302,22 +299,6 @@ interface Database<T> {
*/
Collection<LocalAuthor> getLocalAuthors(T txn) throws DbException;
/**
* Returns the local transport properties for all transports.
* <p>
* Locking: read.
*/
Map<TransportId, TransportProperties> getLocalProperties(T txn)
throws DbException;
/**
* Returns the local transport properties for the given transport.
* <p>
* Locking: read.
*/
TransportProperties getLocalProperties(T txn, TransportId t)
throws DbException;
/**
* Returns the metadata for all messages in the given group.
* <p>
@@ -403,14 +384,6 @@ interface Database<T> {
*/
byte[] getRawMessage(T txn, MessageId m) throws DbException;
/**
* Returns all remote properties for the given transport.
* <p>
* Locking: read.
*/
Map<ContactId, TransportProperties> getRemoteProperties(T txn,
TransportId t) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be sent to the
* given contact and have been requested by the contact, up to the given
@@ -452,15 +425,6 @@ interface Database<T> {
SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c,
int maxLatency) throws DbException;
/**
* Returns a collection of transport acks for the given contact, or null if
* no acks are due.
* <p>
* Locking: write.
*/
Collection<TransportAck> getTransportAcks(T txn, ContactId c)
throws DbException;
/**
* Returns all transport keys for the given transport.
* <p>
@@ -476,16 +440,6 @@ interface Database<T> {
*/
Map<TransportId, Integer> getTransportLatencies(T txn) throws DbException;
/**
* Returns a collection of transport updates for the given contact and
* updates their expiry times using the given latency, or returns null if
* no updates are due.
* <p>
* Locking: write.
*/
Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c,
int maxLatency) throws DbException;
/**
* Returns the IDs of all contacts to which the given group is visible.
* <p>
@@ -529,15 +483,6 @@ interface Database<T> {
void mergeGroupMetadata(T txn, GroupId g, Metadata meta)
throws DbException;
/**
* Merges the given properties with the existing local properties for the
* given transport.
* <p>
* Locking: write.
*/
void mergeLocalProperties(T txn, TransportId t, TransportProperties p)
throws DbException;
/*
* Merges the given metadata with the existing metadata for the given
* message.
@@ -689,26 +634,6 @@ interface Database<T> {
boolean setGroups(T txn, ContactId c, Collection<Group> groups,
long version) throws DbException;
/**
* Sets the remote transport properties for the given contact, replacing
* any existing properties.
* <p>
* Locking: write.
*/
void setRemoteProperties(T txn, ContactId c,
Map<TransportId, TransportProperties> p) throws DbException;
/**
* Updates the remote transport properties for the given contact and the
* given transport, replacing any existing properties, and returns true,
* unless an update with an equal or higher version number has already been
* received from the contact.
* <p>
* Locking: write.
*/
boolean setRemoteProperties(T txn, ContactId c, TransportId t,
TransportProperties p, long version) throws DbException;
/**
* Records a subscription ack from the given contact for the given version,
* unless the contact has already acked an equal or higher version.
@@ -718,15 +643,6 @@ interface Database<T> {
void setSubscriptionUpdateAcked(T txn, ContactId c, long version)
throws DbException;
/**
* Records a transport ack from the give contact for the given version,
* unless the contact has already acked an equal or higher version.
* <p>
* Locking: write.
*/
void setTransportUpdateAcked(T txn, ContactId c, TransportId t,
long version) throws DbException;
/**
* Makes a group visible or invisible to future contacts by default.
* <p>

View File

@@ -2,7 +2,6 @@ package org.briarproject.db;
import org.briarproject.api.Settings;
import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.db.ContactExistsException;
@@ -19,7 +18,6 @@ import org.briarproject.api.db.NoSuchTransportException;
import org.briarproject.api.db.StorageStatus;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
import org.briarproject.api.event.MessageAddedEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageToAckEvent;
@@ -28,7 +26,6 @@ import org.briarproject.api.event.MessageValidatedEvent;
import org.briarproject.api.event.MessagesAckedEvent;
import org.briarproject.api.event.MessagesSentEvent;
import org.briarproject.api.event.RemoteSubscriptionsUpdatedEvent;
import org.briarproject.api.event.RemoteTransportsUpdatedEvent;
import org.briarproject.api.event.SettingsUpdatedEvent;
import org.briarproject.api.event.SubscriptionAddedEvent;
import org.briarproject.api.event.SubscriptionRemovedEvent;
@@ -49,8 +46,6 @@ import org.briarproject.api.sync.Offer;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.transport.TransportKeys;
import java.io.IOException;
@@ -475,47 +470,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
public Collection<TransportAck> generateTransportAcks(ContactId c)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<TransportAck> acks = db.getTransportAcks(txn, c);
db.commitTransaction(txn);
return acks;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public Collection<TransportUpdate> generateTransportUpdates(ContactId c,
int maxLatency) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<TransportUpdate> updates =
db.getTransportUpdates(txn, c, maxLatency);
db.commitTransaction(txn);
return updates;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public Collection<Group> getAvailableGroups(ClientId c) throws DbException {
lock.readLock().lock();
try {
@@ -679,45 +633,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
public Map<TransportId, TransportProperties> getLocalProperties()
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Map<TransportId, TransportProperties> properties =
db.getLocalProperties(txn);
db.commitTransaction(txn);
return properties;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public TransportProperties getLocalProperties(TransportId t)
throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsTransport(txn, t))
throw new NoSuchTransportException();
TransportProperties properties = db.getLocalProperties(txn, t);
db.commitTransaction(txn);
return properties;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Collection<MessageId> getMessagesToValidate(ClientId c)
throws DbException {
lock.readLock().lock();
@@ -840,25 +755,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
public Map<ContactId, TransportProperties> getRemoteProperties(
TransportId t) throws DbException {
lock.readLock().lock();
try {
T txn = db.startTransaction();
try {
Map<ContactId, TransportProperties> properties =
db.getRemoteProperties(txn, t);
db.commitTransaction(txn);
return properties;
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.readLock().unlock();
}
}
public Settings getSettings(String namespace) throws DbException {
lock.readLock().lock();
try {
@@ -992,30 +888,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
}
public void mergeLocalProperties(TransportId t, TransportProperties p)
throws DbException {
boolean changed = false;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsTransport(txn, t))
throw new NoSuchTransportException();
if (!p.equals(db.getLocalProperties(txn, t))) {
db.mergeLocalProperties(txn, t, p);
changed = true;
}
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (changed) eventBus.broadcast(new LocalTransportsUpdatedEvent());
}
public void mergeMessageMetadata(MessageId m, Metadata meta)
throws DbException {
lock.writeLock().lock();
@@ -1208,52 +1080,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (updated) eventBus.broadcast(new RemoteSubscriptionsUpdatedEvent(c));
}
public void receiveTransportAck(ContactId c, TransportAck a)
throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
if (!db.containsTransport(txn, a.getId()))
throw new NoSuchTransportException();
db.setTransportUpdateAcked(txn, c, a.getId(), a.getVersion());
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void receiveTransportUpdate(ContactId c, TransportUpdate u)
throws DbException {
boolean updated;
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
TransportId t = u.getId();
TransportProperties p = u.getProperties();
long version = u.getVersion();
updated = db.setRemoteProperties(txn, c, t, p, version);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
if (updated)
eventBus.broadcast(new RemoteTransportsUpdatedEvent(c, u.getId()));
}
public void removeContact(ContactId c) throws DbException {
lock.writeLock().lock();
try {
@@ -1390,25 +1216,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
eventBus.broadcast(new MessageValidatedEvent(m, c, false, valid));
}
public void setRemoteProperties(ContactId c,
Map<TransportId, TransportProperties> p) throws DbException {
lock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
db.setRemoteProperties(txn, c, p);
db.commitTransaction(txn);
} catch (DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
lock.writeLock().unlock();
}
}
public void setReorderingWindow(ContactId c, TransportId t,
long rotationPeriod, long base, byte[] bitmap) throws DbException {
lock.writeLock().lock();

View File

@@ -2,7 +2,6 @@ package org.briarproject.db;
import org.briarproject.api.Settings;
import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.contact.Contact;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.crypto.SecretKey;
@@ -21,8 +20,6 @@ import org.briarproject.api.sync.MessageId;
import org.briarproject.api.sync.MessageStatus;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.system.Clock;
import org.briarproject.api.transport.IncomingKeys;
import org.briarproject.api.transport.OutgoingKeys;
@@ -210,64 +207,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " maxLatency INT NOT NULL,"
+ " PRIMARY KEY (transportId))";
private static final String CREATE_TRANSPORT_CONFIGS =
"CREATE TABLE transportConfigs"
+ " (transportId VARCHAR NOT NULL,"
+ " key VARCHAR NOT NULL,"
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (transportId, key),"
+ " FOREIGN KEY (transportId)"
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_TRANSPORT_PROPS =
"CREATE TABLE transportProperties"
+ " (transportId VARCHAR NOT NULL,"
+ " key VARCHAR NOT NULL,"
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (transportId, key),"
+ " FOREIGN KEY (transportId)"
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_TRANSPORT_VERSIONS =
"CREATE TABLE transportVersions"
+ " (contactId INT NOT NULL,"
+ " transportId VARCHAR NOT NULL,"
+ " localVersion BIGINT NOT NULL,"
+ " localAcked BIGINT NOT NULL,"
+ " expiry BIGINT NOT NULL,"
+ " txCount INT NOT NULL,"
+ " PRIMARY KEY (contactId, transportId),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (transportId)"
+ " REFERENCES transports (transportId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_CONTACT_TRANSPORT_PROPS =
"CREATE TABLE contactTransportProperties"
+ " (contactId INT NOT NULL,"
+ " transportId VARCHAR NOT NULL," // Not a foreign key
+ " key VARCHAR NOT NULL,"
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (contactId, transportId, key),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_CONTACT_TRANSPORT_VERSIONS =
"CREATE TABLE contactTransportVersions"
+ " (contactId INT NOT NULL,"
+ " transportId VARCHAR NOT NULL," // Not a foreign key
+ " remoteVersion BIGINT NOT NULL,"
+ " remoteAcked BOOLEAN NOT NULL,"
+ " PRIMARY KEY (contactId, transportId),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_INCOMING_KEYS =
"CREATE TABLE incomingKeys"
+ " (contactId INT NOT NULL,"
@@ -405,11 +344,6 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(insertTypeNames(CREATE_OFFERS));
s.executeUpdate(insertTypeNames(CREATE_STATUSES));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORTS));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORT_CONFIGS));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORT_PROPS));
s.executeUpdate(insertTypeNames(CREATE_TRANSPORT_VERSIONS));
s.executeUpdate(insertTypeNames(CREATE_CONTACT_TRANSPORT_PROPS));
s.executeUpdate(insertTypeNames(CREATE_CONTACT_TRANSPORT_VERSIONS));
s.executeUpdate(insertTypeNames(CREATE_INCOMING_KEYS));
s.executeUpdate(insertTypeNames(CREATE_OUTGOING_KEYS));
s.close();
@@ -629,31 +563,6 @@ abstract class JdbcDatabase implements Database<Connection> {
affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
// Create a transport version row for each local transport
sql = "SELECT transportId FROM transports";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
Collection<String> transports = new ArrayList<String>();
while (rs.next()) transports.add(rs.getString(1));
rs.close();
ps.close();
if (transports.isEmpty()) return c;
sql = "INSERT INTO transportVersions (contactId, transportId,"
+ " localVersion, localAcked, expiry, txCount)"
+ " VALUES (?, ?, 1, 0, 0, 0)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
for (String t : transports) {
ps.setString(2, t);
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != transports.size())
throw new DbStateException();
for (int i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
return c;
} catch (SQLException e) {
tryToClose(rs);
@@ -851,30 +760,6 @@ abstract class JdbcDatabase implements Database<Connection> {
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
// Create a transport version row for each contact
sql = "SELECT contactId FROM contacts";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
Collection<Integer> contacts = new ArrayList<Integer>();
while (rs.next()) contacts.add(rs.getInt(1));
rs.close();
ps.close();
if (contacts.isEmpty()) return true;
sql = "INSERT INTO transportVersions (contactId, transportId,"
+ " localVersion, localAcked, expiry, txCount)"
+ " VALUES (?, ?, 1, 0, 0, 0)";
ps = txn.prepareStatement(sql);
ps.setString(2, t.getString());
for (Integer c : contacts) {
ps.setInt(1, c);
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != contacts.size())
throw new DbStateException();
for (int i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] != 1) throw new DbStateException();
}
return true;
} catch (SQLException e) {
tryToClose(ps);
@@ -1415,62 +1300,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Map<TransportId, TransportProperties> getLocalProperties(
Connection txn) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT transportId, key, value"
+ " FROM transportProperties"
+ " ORDER BY transportId";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
Map<TransportId, TransportProperties> properties =
new HashMap<TransportId, TransportProperties>();
TransportId lastId = null;
TransportProperties p = null;
while (rs.next()) {
TransportId id = new TransportId(rs.getString(1));
String key = rs.getString(2), value = rs.getString(3);
if (!id.equals(lastId)) {
p = new TransportProperties();
properties.put(id, p);
lastId = id;
}
p.put(key, value);
}
rs.close();
ps.close();
return Collections.unmodifiableMap(properties);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public TransportProperties getLocalProperties(Connection txn, TransportId t)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT key, value FROM transportProperties"
+ " WHERE transportId = ?";
ps = txn.prepareStatement(sql);
ps.setString(1, t.getString());
rs = ps.executeQuery();
TransportProperties p = new TransportProperties();
while (rs.next()) p.put(rs.getString(1), rs.getString(2));
rs.close();
ps.close();
return p;
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Map<MessageId, Metadata> getMessageMetadata(Connection txn,
GroupId g) throws DbException {
PreparedStatement ps = null;
@@ -1773,42 +1602,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Map<ContactId, TransportProperties> getRemoteProperties(
Connection txn, TransportId t) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT contactId, key, value"
+ " FROM contactTransportProperties"
+ " WHERE transportId = ?"
+ " ORDER BY contactId";
ps = txn.prepareStatement(sql);
ps.setString(1, t.getString());
rs = ps.executeQuery();
Map<ContactId, TransportProperties> properties =
new HashMap<ContactId, TransportProperties>();
ContactId lastId = null;
TransportProperties p = null;
while (rs.next()) {
ContactId id = new ContactId(rs.getInt(1));
String key = rs.getString(2), value = rs.getString(3);
if (!id.equals(lastId)) {
p = new TransportProperties();
properties.put(id, p);
lastId = id;
}
p.put(key, value);
}
rs.close();
ps.close();
return Collections.unmodifiableMap(properties);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public Collection<MessageId> getRequestedMessagesToSend(Connection txn,
ContactId c, int maxLength) throws DbException {
long now = clock.currentTimeMillis();
@@ -1996,48 +1789,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<TransportAck> getTransportAcks(Connection txn,
ContactId c) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT transportId, remoteVersion"
+ " FROM contactTransportVersions"
+ " WHERE contactId = ? AND remoteAcked = FALSE";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
List<TransportAck> acks = new ArrayList<TransportAck>();
while (rs.next()) {
TransportId id = new TransportId(rs.getString(1));
acks.add(new TransportAck(id, rs.getLong(2)));
}
rs.close();
ps.close();
if (acks.isEmpty()) return null;
sql = "UPDATE contactTransportVersions SET remoteAcked = TRUE"
+ " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
for (TransportAck a : acks) {
ps.setString(2, a.getId().getString());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != acks.size())
throw new DbStateException();
for (int i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
return Collections.unmodifiableList(acks);
} catch (SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public Map<ContactId, TransportKeys> getTransportKeys(Connection txn,
TransportId t) throws DbException {
PreparedStatement ps = null;
@@ -2123,72 +1874,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Collection<TransportUpdate> getTransportUpdates(Connection txn,
ContactId c, int maxLatency) throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT tp.transportId, key, value, localVersion,"
+ " txCount"
+ " FROM transportProperties AS tp"
+ " JOIN transportVersions AS tv"
+ " ON tp.transportId = tv.transportId"
+ " WHERE tv.contactId = ?"
+ " AND localVersion > localAcked"
+ " AND expiry < ?"
+ " ORDER BY tp.transportId";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setLong(2, now);
rs = ps.executeQuery();
List<TransportUpdate> updates = new ArrayList<TransportUpdate>();
TransportId lastId = null;
TransportProperties p = null;
List<Integer> txCounts = new ArrayList<Integer>();
while (rs.next()) {
TransportId id = new TransportId(rs.getString(1));
String key = rs.getString(2), value = rs.getString(3);
long version = rs.getLong(4);
int txCount = rs.getInt(5);
if (!id.equals(lastId)) {
p = new TransportProperties();
updates.add(new TransportUpdate(id, p, version));
txCounts.add(txCount);
lastId = id;
}
p.put(key, value);
}
rs.close();
ps.close();
if (updates.isEmpty()) return null;
sql = "UPDATE transportVersions"
+ " SET expiry = ?, txCount = txCount + 1"
+ " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(2, c.getInt());
int i = 0;
for (TransportUpdate u : updates) {
int txCount = txCounts.get(i++);
ps.setLong(1, calculateExpiry(now, maxLatency, txCount));
ps.setString(3, u.getId().getString());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != updates.size())
throw new DbStateException();
for (i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
return Collections.unmodifiableList(updates);
} catch (SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public Collection<ContactId> getVisibility(Connection txn, GroupId g)
throws DbException {
PreparedStatement ps = null;
@@ -2282,73 +1967,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void mergeLocalProperties(Connection txn, TransportId t,
TransportProperties p) throws DbException {
// Merge the new properties with the existing ones
mergeStringMap(txn, t, p, "transportProperties");
// Bump the transport version
PreparedStatement ps = null;
try {
String sql = "UPDATE transportVersions"
+ " SET localVersion = localVersion + 1, expiry = 0"
+ " WHERE transportId = ?";
ps = txn.prepareStatement(sql);
ps.setString(1, t.getString());
ps.executeUpdate();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
private void mergeStringMap(Connection txn, TransportId t,
Map<String, String> m, String tableName) throws DbException {
PreparedStatement ps = null;
try {
// Update any properties that already exist
String sql = "UPDATE " + tableName + " SET value = ?"
+ " WHERE transportId = ? AND key = ?";
ps = txn.prepareStatement(sql);
ps.setString(2, t.getString());
for (Entry<String, String> e : m.entrySet()) {
ps.setString(1, e.getValue());
ps.setString(3, e.getKey());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != m.size()) throw new DbStateException();
for (int i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] < 0) throw new DbStateException();
if (batchAffected[i] > 1) throw new DbStateException();
}
// Insert any properties that don't already exist
sql = "INSERT INTO " + tableName + " (transportId, key, value)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setString(1, t.getString());
int updateIndex = 0, inserted = 0;
for (Entry<String, String> e : m.entrySet()) {
if (batchAffected[updateIndex] == 0) {
ps.setString(2, e.getKey());
ps.setString(3, e.getValue());
ps.addBatch();
inserted++;
}
updateIndex++;
}
batchAffected = ps.executeBatch();
if (batchAffected.length != inserted) throw new DbStateException();
for (int i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void mergeGroupMetadata(Connection txn, GroupId g, Metadata meta)
throws DbException {
mergeMetadata(txn, g.getBytes(), meta, "groupMetadata", "groupId");
@@ -2894,127 +2512,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setRemoteProperties(Connection txn, ContactId c,
Map<TransportId, TransportProperties> p) throws DbException {
PreparedStatement ps = null;
try {
// Delete the existing properties, if any
String sql = "DELETE FROM contactTransportProperties"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.executeUpdate();
ps.close();
// Store the new properties
sql = "INSERT INTO contactTransportProperties"
+ " (contactId, transportId, key, value)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
int batchSize = 0;
for (Entry<TransportId, TransportProperties> e : p.entrySet()) {
ps.setString(2, e.getKey().getString());
for (Entry<String, String> e1 : e.getValue().entrySet()) {
ps.setString(3, e1.getKey());
ps.setString(4, e1.getValue());
ps.addBatch();
batchSize++;
}
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != batchSize) throw new DbStateException();
for (int i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public boolean setRemoteProperties(Connection txn, ContactId c,
TransportId t, TransportProperties p, long version)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// Find the existing version, if any
String sql = "SELECT NULL FROM contactTransportVersions"
+ " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setString(2, t.getString());
rs = ps.executeQuery();
boolean found = rs.next();
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
// Mark the update as needing to be acked
if (found) {
// The row exists - update it
sql = "UPDATE contactTransportVersions"
+ " SET remoteVersion = ?, remoteAcked = FALSE"
+ " WHERE contactId = ? AND transportId = ?"
+ " AND remoteVersion < ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, version);
ps.setInt(2, c.getInt());
ps.setString(3, t.getString());
ps.setLong(4, version);
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
// Return false if the update is obsolete
if (affected == 0) return false;
} else {
// The row doesn't exist - create it
sql = "INSERT INTO contactTransportVersions (contactId,"
+ " transportId, remoteVersion, remoteAcked)"
+ " VALUES (?, ?, ?, FALSE)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setString(2, t.getString());
ps.setLong(3, version);
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
}
// Delete the existing properties, if any
sql = "DELETE FROM contactTransportProperties"
+ " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setString(2, t.getString());
ps.executeUpdate();
ps.close();
// Store the new properties, if any
if (p.isEmpty()) return true;
sql = "INSERT INTO contactTransportProperties"
+ " (contactId, transportId, key, value)"
+ " VALUES (?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setString(2, t.getString());
for (Entry<String, String> e : p.entrySet()) {
ps.setString(3, e.getKey());
ps.setString(4, e.getValue());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != p.size()) throw new DbStateException();
for (int i = 0; i < batchAffected.length; i++) {
if (batchAffected[i] != 1) throw new DbStateException();
}
ps.close();
return true;
} catch (SQLException e) {
tryToClose(ps);
tryToClose(rs);
throw new DbException(e);
}
}
public void setSubscriptionUpdateAcked(Connection txn, ContactId c,
long version) throws DbException {
PreparedStatement ps = null;
@@ -3036,28 +2533,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setTransportUpdateAcked(Connection txn, ContactId c,
TransportId t, long version) throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE transportVersions SET localAcked = ?"
+ " WHERE contactId = ? AND transportId = ?"
+ " AND localAcked < ? AND localVersion >= ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, version);
ps.setInt(2, c.getInt());
ps.setString(3, t.getString());
ps.setLong(4, version);
ps.setLong(5, version);
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps);
throw new DbException(e);
}
}
public void setVisibleToAll(Connection txn, GroupId g, boolean all)
throws DbException {
PreparedStatement ps = null;

View File

@@ -9,6 +9,7 @@ import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.db.DbException;
import org.briarproject.api.property.TransportPropertyManager;
import java.util.Collections;
import java.util.Map;
// Temporary facade during sync protocol refactoring
@@ -24,30 +25,33 @@ class TransportPropertyManagerImpl implements TransportPropertyManager {
@Override
public Map<TransportId, TransportProperties> getLocalProperties()
throws DbException {
return db.getLocalProperties();
// TODO
return Collections.emptyMap();
}
@Override
public TransportProperties getLocalProperties(TransportId t)
throws DbException {
return db.getLocalProperties(t);
// TODO
return new TransportProperties();
}
@Override
public Map<ContactId, TransportProperties> getRemoteProperties(
TransportId t) throws DbException {
return db.getRemoteProperties(t);
// TODO
return Collections.emptyMap();
}
@Override
public void mergeLocalProperties(TransportId t, TransportProperties p)
throws DbException {
db.mergeLocalProperties(t, p);
// TODO
}
@Override
public void setRemoteProperties(ContactId c,
Map<TransportId, TransportProperties> p) throws DbException {
db.setRemoteProperties(c, p);
// TODO
}
}

View File

@@ -9,13 +9,11 @@ import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventBus;
import org.briarproject.api.event.EventListener;
import org.briarproject.api.event.LocalSubscriptionsUpdatedEvent;
import org.briarproject.api.event.LocalTransportsUpdatedEvent;
import org.briarproject.api.event.MessageRequestedEvent;
import org.briarproject.api.event.MessageToAckEvent;
import org.briarproject.api.event.MessageToRequestEvent;
import org.briarproject.api.event.MessageValidatedEvent;
import org.briarproject.api.event.RemoteSubscriptionsUpdatedEvent;
import org.briarproject.api.event.RemoteTransportsUpdatedEvent;
import org.briarproject.api.event.ShutdownEvent;
import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.sync.Ack;
@@ -25,8 +23,6 @@ import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSession;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.api.system.Clock;
import java.io.IOException;
@@ -91,8 +87,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
eventBus.addListener(this);
try {
// Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateAck());
@@ -123,7 +117,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
now = clock.currentTimeMillis();
if (now >= nextRetxQuery) {
// Check for retransmittable packets
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
@@ -171,8 +164,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateOffer());
}
} else if (e instanceof LocalTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportUpdates());
} else if (e instanceof MessageRequestedEvent) {
if (((MessageRequestedEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateBatch());
@@ -189,11 +180,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateOffer());
}
} else if (e instanceof RemoteTransportsUpdatedEvent) {
RemoteTransportsUpdatedEvent r =
(RemoteTransportsUpdatedEvent) e;
if (r.getContactId().equals(contactId))
dbExecutor.execute(new GenerateTransportAcks());
} else if (e instanceof ShutdownEvent) {
interrupt();
} else if (e instanceof TransportRemovedEvent) {
@@ -414,76 +400,4 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateSubscriptionUpdate());
}
}
// This task runs on the database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated transport acks: " + (acks != null));
if (acks != null) writerTasks.add(new WriteTransportAcks(acks));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteTransportAcks implements ThrowingRunnable<IOException> {
private final Collection<TransportAck> acks;
private WriteTransportAcks(Collection<TransportAck> acks) {
this.acks = acks;
}
public void run() throws IOException {
if (interrupted) return;
for (TransportAck a : acks) packetWriter.writeTransportAck(a);
LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks());
}
}
// This task runs on the database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated transport updates: " + (t != null));
if (t != null) writerTasks.add(new WriteTransportUpdates(t));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteTransportUpdates
implements ThrowingRunnable<IOException> {
private final Collection<TransportUpdate> updates;
private WriteTransportUpdates(Collection<TransportUpdate> updates) {
this.updates = updates;
}
public void run() throws IOException {
if (interrupted) return;
for (TransportUpdate u : updates)
packetWriter.writeTransportUpdate(u);
LOG.info("Sent transport updates");
dbExecutor.execute(new GenerateTransportUpdates());
}
}
}

View File

@@ -19,8 +19,6 @@ import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSession;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import java.io.IOException;
import java.util.concurrent.Executor;
@@ -77,12 +75,6 @@ class IncomingSession implements SyncSession, EventListener {
} else if (packetReader.hasSubscriptionUpdate()) {
SubscriptionUpdate u = packetReader.readSubscriptionUpdate();
dbExecutor.execute(new ReceiveSubscriptionUpdate(u));
} else if (packetReader.hasTransportAck()) {
TransportAck a = packetReader.readTransportAck();
dbExecutor.execute(new ReceiveTransportAck(a));
} else if (packetReader.hasTransportUpdate()) {
TransportUpdate u = packetReader.readTransportUpdate();
dbExecutor.execute(new ReceiveTransportUpdate(u));
} else {
throw new FormatException();
}
@@ -216,40 +208,4 @@ class IncomingSession implements SyncSession, EventListener {
}
}
}
private class ReceiveTransportAck implements Runnable {
private final TransportAck ack;
private ReceiveTransportAck(TransportAck ack) {
this.ack = ack;
}
public void run() {
try {
db.receiveTransportAck(contactId, ack);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
private class ReceiveTransportUpdate implements Runnable {
private final TransportUpdate update;
private ReceiveTransportUpdate(TransportUpdate update) {
this.update = update;
}
public void run() {
try {
db.receiveTransportUpdate(contactId, update);
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
}

View File

@@ -1,8 +1,6 @@
package org.briarproject.sync;
import org.briarproject.api.FormatException;
import org.briarproject.api.TransportId;
import org.briarproject.api.TransportProperties;
import org.briarproject.api.UniqueId;
import org.briarproject.api.crypto.CryptoComponent;
import org.briarproject.api.data.BdfReader;
@@ -17,8 +15,6 @@ import org.briarproject.api.sync.PacketReader;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.util.ByteUtils;
import java.io.ByteArrayInputStream;
@@ -26,21 +22,14 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTIES_PER_TRANSPORT;
import static org.briarproject.api.TransportPropertyConstants.MAX_PROPERTY_LENGTH;
import static org.briarproject.api.TransportPropertyConstants.MAX_TRANSPORT_ID_LENGTH;
import static org.briarproject.api.sync.PacketTypes.ACK;
import static org.briarproject.api.sync.PacketTypes.MESSAGE;
import static org.briarproject.api.sync.PacketTypes.OFFER;
import static org.briarproject.api.sync.PacketTypes.REQUEST;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_ACK;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_UPDATE;
import static org.briarproject.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PACKET_HEADER_LENGTH;
@@ -205,66 +194,4 @@ class PacketReaderImpl implements PacketReader {
state = State.BUFFER_EMPTY;
return u;
}
public boolean hasTransportAck() throws IOException {
return !eof() && header[1] == TRANSPORT_ACK;
}
public TransportAck readTransportAck() throws IOException {
if (!hasTransportAck()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
BdfReader r = bdfReaderFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the transport ID and version
String idString = r.readString(MAX_TRANSPORT_ID_LENGTH);
if (idString.length() == 0) throw new FormatException();
TransportId id = new TransportId(idString);
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the transport ack
return new TransportAck(id, version);
}
public boolean hasTransportUpdate() throws IOException {
return !eof() && header[1] == TRANSPORT_UPDATE;
}
public TransportUpdate readTransportUpdate() throws IOException {
if (!hasTransportUpdate()) throw new FormatException();
// Set up the reader
InputStream bais = new ByteArrayInputStream(payload, 0, payloadLength);
BdfReader r = bdfReaderFactory.createReader(bais);
// Read the start of the payload
r.readListStart();
// Read the transport ID
String idString = r.readString(MAX_TRANSPORT_ID_LENGTH);
if (idString.length() == 0) throw new FormatException();
TransportId id = new TransportId(idString);
// Read the transport properties
Map<String, String> p = new HashMap<String, String>();
r.readDictionaryStart();
for (int i = 0; !r.hasDictionaryEnd(); i++) {
if (i == MAX_PROPERTIES_PER_TRANSPORT)
throw new FormatException();
String key = r.readString(MAX_PROPERTY_LENGTH);
String value = r.readString(MAX_PROPERTY_LENGTH);
p.put(key, value);
}
r.readDictionaryEnd();
// Read the version number
long version = r.readInteger();
if (version < 0) throw new FormatException();
// Read the end of the payload
r.readListEnd();
if (!r.eof()) throw new FormatException();
state = State.BUFFER_EMPTY;
// Build and return the transport update
return new TransportUpdate(id, new TransportProperties(p), version);
}
}

View File

@@ -12,8 +12,6 @@ import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.Request;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import org.briarproject.util.ByteUtils;
import java.io.ByteArrayOutputStream;
@@ -25,8 +23,6 @@ import static org.briarproject.api.sync.PacketTypes.OFFER;
import static org.briarproject.api.sync.PacketTypes.REQUEST;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_ACK;
import static org.briarproject.api.sync.PacketTypes.SUBSCRIPTION_UPDATE;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_ACK;
import static org.briarproject.api.sync.PacketTypes.TRANSPORT_UPDATE;
import static org.briarproject.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PACKET_HEADER_LENGTH;
import static org.briarproject.api.sync.SyncConstants.PROTOCOL_VERSION;
@@ -125,27 +121,6 @@ class PacketWriterImpl implements PacketWriter {
writePacket(SUBSCRIPTION_UPDATE);
}
public void writeTransportAck(TransportAck a) throws IOException {
if (payload.size() != 0) throw new IllegalStateException();
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeString(a.getId().getString());
w.writeInteger(a.getVersion());
w.writeListEnd();
writePacket(TRANSPORT_ACK);
}
public void writeTransportUpdate(TransportUpdate u) throws IOException {
if (payload.size() != 0) throw new IllegalStateException();
BdfWriter w = bdfWriterFactory.createWriter(payload);
w.writeListStart();
w.writeString(u.getId().getString());
w.writeDictionary(u.getProperties());
w.writeInteger(u.getVersion());
w.writeListEnd();
writePacket(TRANSPORT_UPDATE);
}
public void flush() throws IOException {
out.flush();
}

View File

@@ -15,8 +15,6 @@ import org.briarproject.api.sync.PacketWriter;
import org.briarproject.api.sync.SubscriptionAck;
import org.briarproject.api.sync.SubscriptionUpdate;
import org.briarproject.api.sync.SyncSession;
import org.briarproject.api.sync.TransportAck;
import org.briarproject.api.sync.TransportUpdate;
import java.io.IOException;
import java.util.Collection;
@@ -68,7 +66,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
this.transportId = transportId;
this.maxLatency = maxLatency;
this.packetWriter = packetWriter;
outstandingQueries = new AtomicInteger(6); // One per type of packet
outstandingQueries = new AtomicInteger(4); // One per type of packet
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
}
@@ -76,8 +74,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
eventBus.addListener(this);
try {
// Start a query for each type of packet, in order of urgency
dbExecutor.execute(new GenerateTransportAcks());
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateAck());
@@ -264,78 +260,4 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateSubscriptionUpdate());
}
}
// This task runs on the database thread
private class GenerateTransportAcks implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
if (LOG.isLoggable(INFO))
LOG.info("Generated transport acks: " + (acks != null));
if (acks == null) decrementOutstandingQueries();
else writerTasks.add(new WriteTransportAcks(acks));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This tasks runs on the writer thread
private class WriteTransportAcks implements ThrowingRunnable<IOException> {
private final Collection<TransportAck> acks;
private WriteTransportAcks(Collection<TransportAck> acks) {
this.acks = acks;
}
public void run() throws IOException {
if (interrupted) return;
for (TransportAck a : acks) packetWriter.writeTransportAck(a);
LOG.info("Sent transport acks");
dbExecutor.execute(new GenerateTransportAcks());
}
}
// This task runs on the database thread
private class GenerateTransportUpdates implements Runnable {
public void run() {
if (interrupted) return;
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);
if (LOG.isLoggable(INFO))
LOG.info("Generated transport updates: " + (t != null));
if (t == null) decrementOutstandingQueries();
else writerTasks.add(new WriteTransportUpdates(t));
} catch (DbException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
interrupt();
}
}
}
// This task runs on the writer thread
private class WriteTransportUpdates
implements ThrowingRunnable<IOException> {
private final Collection<TransportUpdate> updates;
private WriteTransportUpdates(Collection<TransportUpdate> updates) {
this.updates = updates;
}
public void run() throws IOException {
if (interrupted) return;
for (TransportUpdate u : updates)
packetWriter.writeTransportUpdate(u);
LOG.info("Sent transport updates");
dbExecutor.execute(new GenerateTransportUpdates());
}
}
}