Store outgoing connection numbers in the database.

This commit is contained in:
akwizgran
2011-09-28 19:24:22 +01:00
parent 5aa7da2048
commit 2edb18aee0
4 changed files with 99 additions and 3 deletions

View File

@@ -98,6 +98,12 @@ public interface DatabaseComponent {
void generateTransportUpdate(ContactId c, TransportWriter t) throws
DbException, IOException;
/**
* Returns an outgoing connection number for the given contact and
* transport.
*/
long getConnectionNumber(ContactId c, int transportId) throws DbException;
/**
* Returns the connection reordering window for the given contact and
* transport.

View File

@@ -165,6 +165,15 @@ interface Database<T> {
*/
Collection<BatchId> getBatchesToAck(T txn, ContactId c) throws DbException;
/**
* Allocates and returns a connection number for the given contact and
* transport.
* <p>
* Locking: contacts read, windows write.
*/
long getConnectionNumber(T txn, ContactId c, int transportId)
throws DbException;
/**
* Returns the connection reordering window for the given contact and
* transport.

View File

@@ -632,6 +632,30 @@ DatabaseCleaner.Callback {
LOG.fine("Added " + transports.size() + " transports to update");
}
public long getConnectionNumber(ContactId c, int transportId)
throws DbException {
contactLock.readLock().lock();
try {
if(!containsContact(c)) throw new NoSuchContactException();
windowLock.writeLock().lock();
try {
T txn = db.startTransaction();
try {
long outgoing = db.getConnectionNumber(txn, c, transportId);
db.commitTransaction(txn);
return outgoing;
} catch(DbException e) {
db.abortTransaction(txn);
throw e;
}
} finally {
windowLock.writeLock().unlock();
}
} finally {
contactLock.readLock().unlock();
}
}
public ConnectionWindow getConnectionWindow(ContactId c, int transportId)
throws DbException {
contactLock.readLock().lock();

View File

@@ -194,6 +194,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " transportId INT NOT NULL,"
+ " centre BIGINT NOT NULL,"
+ " bitmap INT NOT NULL,"
+ " outgoing BIGINT NOT NULL,"
+ " PRIMARY KEY (contactId, transportId),"
+ " FOREIGN KEY (contactId) REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
@@ -805,6 +806,56 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
public long getConnectionNumber(Connection txn, ContactId c,
int transportId) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT outgoing FROM connectionWindows"
+ " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, transportId);
rs = ps.executeQuery();
if(rs.next()) {
long outgoing = rs.getLong(1);
if(rs.next()) throw new DbStateException();
rs.close();
ps.close();
sql = "UPDATE connectionWindows SET outgoing = ?"
+ " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, outgoing + 1);
ps.setInt(2, c.getInt());
ps.setInt(3, transportId);
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
return outgoing;
} else {
rs.close();
ps.close();
sql = "INSERT INTO connectionWindows"
+ " (contactId, transportId, centre, bitmap, outgoing)"
+ " VALUES(?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, transportId);
ps.setLong(3, 0L);
ps.setInt(4, 0);
ps.setLong(5, 0L);
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
return 0L;
}
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
public ConnectionWindow getConnectionWindow(Connection txn, ContactId c,
int transportId) throws DbException {
PreparedStatement ps = null;
@@ -1733,23 +1784,29 @@ abstract class JdbcDatabase implements Database<Connection> {
sql = "UPDATE connectionWindows SET centre = ?, bitmap = ?"
+ " WHERE contactId = ? AND transportId = ?";
ps = txn.prepareStatement(sql);
ps.setLong(1, w.getCentre());
ps.setInt(2, w.getBitmap());
ps.setInt(3, c.getInt());
ps.setInt(4, transportId);
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
} else {
rs.close();
ps.close();
sql = "INSERT INTO connectionWindows"
+ " (contactId, transportId, centre, bitmap)"
+ " VALUES(?, ?, ?, ?)";
+ " (contactId, transportId, centre, bitmap, outgoing)"
+ " VALUES(?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, transportId);
ps.setLong(3, w.getCentre());
ps.setInt(4, w.getBitmap());
ps.setLong(5, 0L);
int affected = ps.executeUpdate();
if(affected != 1) throw new DbStateException();
ps.close();
}
ps.close();
} catch(SQLException e) {
tryToClose(rs);
tryToClose(ps);