Merge branch '542-retransmission' into 'master'

Don't poll for retransmission

Closes #542

See merge request akwizgran/briar!695
This commit is contained in:
akwizgran
2018-02-22 11:07:21 +00:00
6 changed files with 152 additions and 10 deletions

View File

@@ -378,6 +378,16 @@ public interface DatabaseComponent {
MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m)
throws DbException;
/*
* Returns the next time (in milliseconds since the Unix epoch) when a
* message is due to be sent to the given contact. The returned value may
* be zero if a message is due to be sent immediately, or Long.MAX_VALUE if
* no messages are scheduled to be sent.
* <p/>
* Read-only.
*/
long getNextSendTime(Transaction txn, ContactId c) throws DbException;
/**
* Returns all settings in the given namespace.
* <p/>

View File

@@ -449,6 +449,16 @@ interface Database<T> {
Collection<MessageId> getMessagesToShare(T txn, ClientId c)
throws DbException;
/**
* Returns the next time (in milliseconds since the Unix epoch) when a
* message is due to be sent to the given contact. The returned value may
* be zero if a message is due to be sent immediately, or Long.MAX_VALUE
* if no messages are scheduled to be sent.
* <p/>
* Read-only.
*/
long getNextSendTime(T txn, ContactId c) throws DbException;
/**
* Returns the message with the given ID, in serialised form, or null if
* the message has been deleted.

View File

@@ -569,6 +569,13 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessageDependents(txn, m);
}
@Override
public long getNextSendTime(Transaction transaction, ContactId c)
throws DbException {
T txn = unbox(transaction);
return db.getNextSendTime(txn, c);
}
@Override
public Settings getSettings(Transaction transaction, String namespace)
throws DbException {

View File

@@ -1929,6 +1929,37 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public long getNextSendTime(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT expiry FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = FALSE"
+ " ORDER BY expiry LIMIT 1";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
rs = ps.executeQuery();
long nextSendTime = Long.MAX_VALUE;
if (rs.next()) {
nextSendTime = rs.getLong(1);
if (rs.next()) throw new AssertionError();
}
rs.close();
ps.close();
return nextSendTime;
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
@Override
@Nullable
public byte[] getRawMessage(Connection txn, MessageId m)

View File

@@ -30,6 +30,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
@@ -50,13 +51,14 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD
@NotNullByDefault
class DuplexOutgoingSession implements SyncSession, EventListener {
// Check for retransmittable records once every 60 seconds
private static final int RETX_QUERY_INTERVAL = 60 * 1000;
private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName());
private static final ThrowingRunnable<IOException> CLOSE = () -> {
};
private static final ThrowingRunnable<IOException>
NEXT_SEND_TIME_DECREASED = () -> {
};
private final DatabaseComponent db;
private final Executor dbExecutor;
@@ -72,6 +74,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false);
private final AtomicBoolean generateRequestQueued =
new AtomicBoolean(false);
private final AtomicLong nextSendTime = new AtomicLong(Long.MAX_VALUE);
private volatile boolean interrupted = false;
@@ -101,15 +104,15 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
generateRequest();
long now = clock.currentTimeMillis();
long nextKeepalive = now + maxIdleTime;
long nextRetxQuery = now + RETX_QUERY_INTERVAL;
boolean dataToFlush = true;
// Write records until interrupted
try {
while (!interrupted) {
// Work out how long we should wait for a record
now = clock.currentTimeMillis();
long wait = Math.min(nextKeepalive, nextRetxQuery) - now;
if (wait < 0) wait = 0;
long keepaliveWait = Math.max(0, nextKeepalive - now);
long sendWait = Math.max(0, nextSendTime.get() - now);
long wait = Math.min(keepaliveWait, sendWait);
// Flush any unflushed data if we're going to wait
if (wait > 0 && dataToFlush && writerTasks.isEmpty()) {
recordWriter.flush();
@@ -121,20 +124,25 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
MILLISECONDS);
if (task == null) {
now = clock.currentTimeMillis();
if (now >= nextRetxQuery) {
// Check for retransmittable records
if (now >= nextSendTime.get()) {
// Check for retransmittable messages
LOG.info("Checking for retransmittable messages");
setNextSendTime(Long.MAX_VALUE);
generateBatch();
generateOffer();
nextRetxQuery = now + RETX_QUERY_INTERVAL;
}
if (now >= nextKeepalive) {
// Flush the stream to keep it alive
LOG.info("Sending keepalive");
recordWriter.flush();
dataToFlush = false;
nextKeepalive = now + maxIdleTime;
}
} else if (task == CLOSE) {
LOG.info("Closed");
break;
} else if (task == NEXT_SEND_TIME_DECREASED) {
LOG.info("Next send time decreased");
} else {
task.run();
dataToFlush = true;
@@ -170,6 +178,11 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateRequest());
}
private void setNextSendTime(long time) {
long old = nextSendTime.getAndSet(time);
if (time < old) writerTasks.add(NEXT_SEND_TIME_DECREASED);
}
@Override
public void interrupt() {
interrupted = true;
@@ -259,6 +272,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
try {
b = db.generateRequestedBatch(txn, contactId,
MAX_RECORD_PAYLOAD_LENGTH, maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);
@@ -305,6 +319,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
try {
o = db.generateOffer(txn, contactId, MAX_MESSAGE_IDS,
maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId));
db.commitTransaction(txn);
} finally {
db.endTransaction(txn);

View File

@@ -1605,7 +1605,6 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
@Test
public void testSetMessageState() throws Exception {
Database<Connection> db = open(false);
Connection txn = db.startTransaction();
@@ -1626,6 +1625,52 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.close();
}
@Test
public void testGetNextSendTime() throws Exception {
long now = System.currentTimeMillis();
Database<Connection> db = open(false, new StoppedClock(now));
Connection txn = db.startTransaction();
// Add a contact, a group and a message
db.addLocalAuthor(txn, localAuthor);
assertEquals(contactId, db.addContact(txn, author, localAuthor.getId(),
true, true));
db.addGroup(txn, group);
db.addMessage(txn, message, UNKNOWN, false, null);
// There should be no messages to send
assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId));
// Share the group with the contact - still no messages to send
db.addGroupVisibility(txn, contactId, groupId, true);
assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId));
// Set the message's state to DELIVERED - still no messages to send
db.setMessageState(txn, messageId, DELIVERED);
assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId));
// Share the message - now it should be sendable immediately
db.setMessageShared(txn, messageId);
assertEquals(0, db.getNextSendTime(txn, contactId));
// Update the message's expiry time as though we sent it - now the
// message should be sendable after one round-trip
db.updateExpiryTime(txn, contactId, messageId, 1000);
assertEquals(now + 2000, db.getNextSendTime(txn, contactId));
// Update the message's expiry time again - now it should be sendable
// after two round-trips
db.updateExpiryTime(txn, contactId, messageId, 1000);
assertEquals(now + 4000, db.getNextSendTime(txn, contactId));
// Delete the message - there should be no messages to send
db.deleteMessage(txn, messageId);
assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId));
db.commitTransaction(txn);
db.close();
}
@Test
public void testExceptionHandling() throws Exception {
Database<Connection> db = open(false);
@@ -1643,8 +1688,13 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
}
private Database<Connection> open(boolean resume) throws Exception {
return open(resume, new SystemClock());
}
private Database<Connection> open(boolean resume, Clock clock)
throws Exception {
Database<Connection> db = createDatabase(
new TestDatabaseConfig(testDir, MAX_SIZE), new SystemClock());
new TestDatabaseConfig(testDir, MAX_SIZE), clock);
if (!resume) TestUtils.deleteTestDirectory(testDir);
db.open();
return db;
@@ -1674,4 +1724,23 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
public void tearDown() {
TestUtils.deleteTestDirectory(testDir);
}
private static class StoppedClock implements Clock {
private final long time;
private StoppedClock(long time) {
this.time = time;
}
@Override
public long currentTimeMillis() {
return time;
}
@Override
public void sleep(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds);
}
}
}