Added support for local transport details. Each bundle contains the sender's latest transport details.

This commit is contained in:
akwizgran
2011-07-06 19:07:10 +01:00
parent b548820f77
commit 7fb589075d
7 changed files with 200 additions and 25 deletions

View File

@@ -1,5 +1,7 @@
package net.sf.briar.api.protocol;
import java.util.Map;
/** A bundle of acknowledgements, subscriptions, and batches of messages. */
public interface Bundle {
@@ -29,6 +31,12 @@ public interface Bundle {
/** Adds a subscription to the bundle. Cannot be called after seal(). */
void addSubscription(GroupId g);
/** Returns the transport details contained in the bundle. */
Map<String, String> getTransports();
/** Adds a transport detail to the bundle. Cannot be called after seal(). */
void addTransport(String key, String value);
/** Returns the batches of messages contained in the bundle. */
Iterable<Batch> getBatches();

View File

@@ -119,18 +119,8 @@ interface Database<T> {
*/
void addSubscription(T txn, GroupId g) throws DbException;
/**
* Records a contact's subscription to a group.
* <p>
* Locking: contacts read, subscriptions write.
*/
// FIXME: Replace these two methods with a setSubscriptions() method
void addSubscription(T txn, ContactId c, GroupId g) throws DbException;
/**
* Removes all recorded subscriptions for the given contact.
* <p>
* Locking: contacts read, subscriptions write.
*/
void clearSubscriptions(T txn, ContactId c) throws DbException;
/**
@@ -245,6 +235,13 @@ interface Database<T> {
*/
Set<GroupId> getSubscriptions(T txn) throws DbException;
/**
* Returns the local transport details.
* <p>
* Locking: transports read.
*/
Map<String, String> getTransports(T txn) throws DbException;
/**
* Returns the transport details for the given contact.
* <p>
@@ -323,6 +320,14 @@ interface Database<T> {
*/
void setStatus(T txn, ContactId c, MessageId m, Status s) throws DbException;
/**
* Sets the local transport details, replacing any existing transport
* details.
* <p>
* Locking: transports write.
*/
void setTransports(T txn, Map<String, String> transports) throws DbException;
/**
* Sets the transport details for the given contact, replacing any existing
* transport details.

View File

@@ -150,15 +150,21 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String INDEX_STATUSES_BY_CONTACT =
"CREATE INDEX statusesByContact ON statuses (contactId)";
private static final String CREATE_TRANSPORTS =
"CREATE TABLE transports"
private static final String CREATE_CONTACT_TRANSPORTS =
"CREATE TABLE contactTransports"
+ " (contactId INT NOT NULL,"
+ " detailKey VARCHAR NOT NULL,"
+ " detailValue VARCHAR NOT NULL,"
+ " PRIMARY KEY (contactId, detailKey),"
+ " key VARCHAR NOT NULL,"
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (contactId, key),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_LOCAL_TRANSPORTS =
"CREATE TABLE localTransports"
+ " (key VARCHAR NOT NULL,"
+ " value VARCHAR NOT NULL,"
+ " PRIMARY KEY (key))";
private static final Logger LOG =
Logger.getLogger(JdbcDatabase.class.getName());
@@ -249,8 +255,11 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(INDEX_STATUSES_BY_MESSAGE);
s.executeUpdate(INDEX_STATUSES_BY_CONTACT);
if(LOG.isLoggable(Level.FINE))
LOG.fine("Creating transports table");
s.executeUpdate(insertHashType(CREATE_TRANSPORTS));
LOG.fine("Creating contact transports table");
s.executeUpdate(insertHashType(CREATE_CONTACT_TRANSPORTS));
if(LOG.isLoggable(Level.FINE))
LOG.fine("Creating local transports table");
s.executeUpdate(insertHashType(CREATE_LOCAL_TRANSPORTS));
s.close();
} catch(SQLException e) {
tryToClose(s);
@@ -418,8 +427,8 @@ abstract class JdbcDatabase implements Database<Connection> {
ps.close();
// Store the contact's transport details
if(transports != null) {
sql = "INSERT INTO transports"
+ " (contactId, detailKey, detailValue)"
sql = "INSERT INTO contactTransports"
+ " (contactId, key, value)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -1102,12 +1111,33 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public Map<String, String> getTransports(Connection txn)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT key, value FROM localTransports";
ps = txn.prepareStatement(sql);
rs = ps.executeQuery();
Map<String, String> transports = new TreeMap<String, String>();
while(rs.next()) transports.put(rs.getString(1), rs.getString(2));
rs.close();
ps.close();
return transports;
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
public Map<String, String> getTransports(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT detailKey, detailValue FROM transports"
String sql = "SELECT key, value FROM contactTransports"
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
@@ -1373,20 +1403,52 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public void setTransports(Connection txn, Map<String, String> transports)
throws DbException {
PreparedStatement ps = null;
try {
// Delete any existing transports
String sql = "DELETE FROM localTransports";
ps = txn.prepareStatement(sql);
ps.executeUpdate();
ps.close();
// Store the new transports
if(transports != null) {
sql = "INSERT INTO localTransports (key, value)"
+ " VALUES (?, ?)";
ps = txn.prepareStatement(sql);
for(Entry<String, String> e : transports.entrySet()) {
ps.setString(1, e.getKey());
ps.setString(2, e.getValue());
ps.addBatch();
}
int[] rowsAffectedArray = ps.executeBatch();
assert rowsAffectedArray.length == transports.size();
for(int i = 0; i < rowsAffectedArray.length; i++) {
assert rowsAffectedArray[i] == 1;
}
ps.close();
}
} catch(SQLException e) {
tryToClose(ps);
tryToClose(txn);
throw new DbException(e);
}
}
public void setTransports(Connection txn, ContactId c,
Map<String, String> transports) throws DbException {
PreparedStatement ps = null;
try {
// Delete any existing transports
String sql = "DELETE FROM transports WHERE contactId = ?";
String sql = "DELETE FROM contactTransports WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.executeUpdate();
ps.close();
// Store the new transports
if(transports != null) {
sql = "INSERT INTO transports"
+ " (contactId, detailKey, detailValue)"
sql = "INSERT INTO contactTransports (contactId, key, value)"
+ " VALUES (?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());

View File

@@ -3,6 +3,7 @@ package net.sf.briar.db;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
@@ -237,6 +238,33 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
} finally {
contactLock.readLock().unlock();
}
// Add transport details
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
transportLock.readLock().lock();
try {
Txn txn = db.startTransaction();
try {
int numTransports = 0;
Map<String, String> transports = db.getTransports(txn);
for(Entry<String, String> e : transports.entrySet()) {
b.addTransport(e.getKey(), e.getValue());
numTransports++;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + numTransports + " transports");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.readLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
// Add as many messages as possible to the bundle
long capacity = b.getCapacity();
while(true) {
@@ -448,7 +476,27 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
subscriptionLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
// Update the contact's transport details
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
transportLock.writeLock().lock();
try {
Txn txn = db.startTransaction();
try {
db.setTransports(txn, c, b.getTransports());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
transportLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
// Store the messages
int batches = 0;

View File

@@ -4,6 +4,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -178,6 +179,27 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
// Add transport details
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
int numTransports = 0;
Map<String, String> transports = db.getTransports(txn);
for(Entry<String, String> e : transports.entrySet()) {
b.addTransport(e.getKey(), e.getValue());
numTransports++;
}
if(LOG.isLoggable(Level.FINE))
LOG.fine("Added " + numTransports + " transports");
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
// Add as many messages as possible to the bundle
long capacity = b.getCapacity();
while(true) {
@@ -338,6 +360,20 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> {
}
}
}
// Update the contact's transport details
synchronized(contactLock) {
if(!containsContact(c)) throw new NoSuchContactException();
synchronized(transportLock) {
Txn txn = db.startTransaction();
try {
db.setTransports(txn, c, b.getTransports());
db.commitTransaction(txn);
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
}
}
// Store the messages
int batches = 0;
for(Batch batch : b.getBatches()) {

View File

@@ -511,6 +511,10 @@ public abstract class DatabaseComponentTest extends TestCase {
oneOf(database).getSubscriptions(txn);
will(returnValue(Collections.singleton(groupId)));
oneOf(bundle).addSubscription(groupId);
// Add transports to the bundle
oneOf(database).getTransports(txn);
will(returnValue(Collections.singletonMap("foo", "bar")));
oneOf(bundle).addTransport("foo", "bar");
// Prepare to add batches to the bundle
oneOf(bundle).getCapacity();
will(returnValue((long) ONE_MEGABYTE));
@@ -575,6 +579,8 @@ public abstract class DatabaseComponentTest extends TestCase {
@Test
public void testReceivedBundle() throws DbException {
final Map<String, String> transports =
Collections.singletonMap("foo", "bar");
Mockery context = new Mockery();
@SuppressWarnings("unchecked")
final Database<Object> database = context.mock(Database.class);
@@ -598,6 +604,10 @@ public abstract class DatabaseComponentTest extends TestCase {
oneOf(bundle).getSubscriptions();
will(returnValue(Collections.singleton(groupId)));
oneOf(database).addSubscription(txn, contactId, groupId);
// Transports
oneOf(bundle).getTransports();
will(returnValue(transports));
oneOf(database).setTransports(txn, contactId, transports);
// Batches
oneOf(bundle).getBatches();
will(returnValue(Collections.singleton(batch)));

View File

@@ -776,6 +776,12 @@ public class H2DatabaseTest extends TestCase {
// Remove the transport details
db.setTransports(txn, contactId, null);
assertEquals(Collections.emptyMap(), db.getTransports(txn, contactId));
// Set the local transport details
db.setTransports(txn, transports);
assertEquals(transports, db.getTransports(txn));
// Remove the local transport details
db.setTransports(txn, null);
assertEquals(Collections.emptyMap(), db.getTransports(txn));
db.commitTransaction(txn);
db.close();