Merge branch '2045-flexible-sync' into '1802-sync-via-removable-storage'

Make retransmissions in the sync protocol more flexible

See merge request briar/briar!1482
This commit is contained in:
Torsten Grote
2021-06-16 17:40:25 +00:00
22 changed files with 512 additions and 175 deletions

View File

@@ -190,6 +190,18 @@ public interface DatabaseComponent extends TransactionManager {
Collection<Message> generateBatch(Transaction txn, ContactId c, Collection<Message> generateBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException; int maxLength, int maxLatency) throws DbException;
/**
* Returns a batch of messages for the given contact containing the
* messages with the given IDs, for transmission over a transport with
* the given maximum latency.
* <p/>
* If any of the given messages are not in the database or are not visible
* to the contact, they are omitted from the batch without throwing an
* exception.
*/
Collection<Message> generateBatch(Transaction txn, ContactId c,
Collection<MessageId> ids, int maxLatency) throws DbException;
/** /**
* Returns an offer for the given contact for transmission over a * Returns an offer for the given contact for transmission over a
* transport with the given maximum latency, or null if there are no * transport with the given maximum latency, or null if there are no
@@ -302,16 +314,6 @@ public interface DatabaseComponent extends TransactionManager {
*/ */
Message getMessage(Transaction txn, MessageId m) throws DbException; Message getMessage(Transaction txn, MessageId m) throws DbException;
/**
* Returns the total length, including headers, of any messages that are
* eligible to be sent to the given contact via a transport with the given
* max latency.
* <p/>
* Read-only.
*/
long getMessageBytesToSend(Transaction txn, ContactId c, int maxLatency)
throws DbException;
/** /**
* Returns the IDs of all delivered messages in the given group. * Returns the IDs of all delivered messages in the given group.
* <p/> * <p/>
@@ -446,6 +448,27 @@ public interface DatabaseComponent extends TransactionManager {
MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m) MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m)
throws DbException; throws DbException;
/**
* Returns the IDs of all messages that are eligible to be sent to the
* given contact, together with their raw lengths. This may include
* messages that have already been sent and are not yet due for
* retransmission.
* <p/>
* Read-only.
*/
Map<MessageId, Integer> getUnackedMessagesToSend(Transaction txn,
ContactId c) throws DbException;
/**
* Returns the total length, including headers, of all messages that are
* eligible to be sent to the given contact. This may include messages
* that have already been sent and are not yet due for retransmission.
* <p/>
* Read-only.
*/
long getUnackedMessageBytesToSend(Transaction txn, ContactId c)
throws DbException;
/** /**
* Returns the next time (in milliseconds since the Unix epoch) when a * Returns the next time (in milliseconds since the Unix epoch) when a
* message is due to be deleted, or {@link #NO_CLEANUP_DEADLINE} * message is due to be deleted, or {@link #NO_CLEANUP_DEADLINE}

View File

@@ -22,6 +22,11 @@ public interface TransportConnectionWriter {
*/ */
int getMaxIdleTime(); int getMaxIdleTime();
/**
* Returns true if the transport is lossy and cheap.
*/
boolean isLossyAndCheap();
/** /**
* Returns an output stream for writing to the transport connection. * Returns an output stream for writing to the transport connection.
*/ */

View File

@@ -79,6 +79,11 @@ public abstract class AbstractDuplexTransportConnection
return plugin.getMaxIdleTime(); return plugin.getMaxIdleTime();
} }
@Override
public boolean isLossyAndCheap() {
return false;
}
@Override @Override
public OutputStream getOutputStream() throws IOException { public OutputStream getOutputStream() throws IOException {
return AbstractDuplexTransportConnection.this.getOutputStream(); return AbstractDuplexTransportConnection.this.getOutputStream();

View File

@@ -15,6 +15,12 @@ import javax.annotation.Nullable;
@NotNullByDefault @NotNullByDefault
public interface SimplexPlugin extends Plugin { public interface SimplexPlugin extends Plugin {
/**
* Returns true if the transport is likely to lose streams and the cost of
* transmitting redundant copies of data is cheap.
*/
boolean isLossyAndCheap();
/** /**
* Attempts to create and return a reader for the given transport * Attempts to create and return a reader for the given transport
* properties. Returns null if a reader cannot be created. * properties. Returns null if a reader cannot be created.

View File

@@ -16,7 +16,7 @@ public interface SyncSessionFactory {
PriorityHandler handler); PriorityHandler handler);
SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, StreamWriter streamWriter); int maxLatency, boolean eager, StreamWriter streamWriter);
SyncSession createDuplexOutgoingSession(ContactId c, TransportId t, SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, int maxIdleTime, StreamWriter streamWriter, int maxLatency, int maxIdleTime, StreamWriter streamWriter,

View File

@@ -71,8 +71,10 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter( StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), ctx); w.getOutputStream(), ctx);
ContactId c = requireNonNull(ctx.getContactId()); ContactId c = requireNonNull(ctx.getContactId());
// Use eager retransmission if the transport is lossy and cheap
return syncSessionFactory.createSimplexOutgoingSession(c, return syncSessionFactory.createSimplexOutgoingSession(c,
ctx.getTransportId(), w.getMaxLatency(), streamWriter); ctx.getTransportId(), w.getMaxLatency(), w.isLossyAndCheap(),
streamWriter);
} }
} }

View File

@@ -357,16 +357,6 @@ interface Database<T> {
*/ */
Message getMessage(T txn, MessageId m) throws DbException; Message getMessage(T txn, MessageId m) throws DbException;
/**
* Returns the total length, including headers, of any messages that are
* eligible to be sent to the given contact via a transport with the given
* max latency.
* <p/>
* Read-only.
*/
long getMessageBytesToSend(T txn, ContactId c, int maxLatency)
throws DbException;
/** /**
* Returns the IDs and states of all dependencies of the given message. * Returns the IDs and states of all dependencies of the given message.
* For missing dependencies and dependencies in other groups, the state * For missing dependencies and dependencies in other groups, the state
@@ -496,11 +486,37 @@ interface Database<T> {
* Returns the IDs of some messages that are eligible to be sent to the * Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given total length. * given contact, up to the given total length.
* <p/> * <p/>
* Unlike {@link #getUnackedMessagesToSend(Object, ContactId)} this method
* does not return messages that have already been sent unless they are
* due for retransmission.
* <p/>
* Read-only. * Read-only.
*/ */
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength, Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength,
int maxLatency) throws DbException; int maxLatency) throws DbException;
/**
* Returns the IDs of all messages that are eligible to be sent to the
* given contact, together with their raw lengths.
* <p/>
* Unlike {@link #getMessagesToSend(Object, ContactId, int, int)} this
* method may return messages that have already been sent and are not yet
* due for retransmission.
* <p/>
* Read-only.
*/
Map<MessageId, Integer> getUnackedMessagesToSend(T txn, ContactId c)
throws DbException;
/**
* Returns the total length, including headers, of all messages that are
* eligible to be sent to the given contact. This may include messages
* that have already been sent and are not yet due for retransmission.
* <p/>
* Read-only.
*/
long getUnackedMessageBytesToSend(T txn, ContactId c) throws DbException;
/** /**
* Returns the IDs of any messages that need to be validated. * Returns the IDs of any messages that need to be validated.
* <p/> * <p/>

View File

@@ -436,6 +436,32 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return messages; return messages;
} }
@Override
public Collection<Message> generateBatch(Transaction transaction,
ContactId c, Collection<MessageId> ids, int maxLatency)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
long totalLength = 0;
List<Message> messages = new ArrayList<>(ids.size());
List<MessageId> sentIds = new ArrayList<>(ids.size());
for (MessageId m : ids) {
if (db.containsVisibleMessage(txn, c, m)) {
Message message = db.getMessage(txn, m);
totalLength += message.getRawLength();
messages.add(message);
sentIds.add(m);
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
}
}
if (messages.isEmpty()) return messages;
db.lowerRequestedFlag(txn, c, sentIds);
transaction.attach(new MessagesSentEvent(c, sentIds, totalLength));
return messages;
}
@Nullable @Nullable
@Override @Override
public Offer generateOffer(Transaction transaction, ContactId c, public Offer generateOffer(Transaction transaction, ContactId c,
@@ -582,15 +608,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessage(txn, m); return db.getMessage(txn, m);
} }
@Override
public long getMessageBytesToSend(Transaction transaction, ContactId c,
int maxLatency) throws DbException {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
return db.getMessageBytesToSend(txn, c, maxLatency);
}
@Override @Override
public Collection<MessageId> getMessageIds(Transaction transaction, public Collection<MessageId> getMessageIds(Transaction transaction,
GroupId g) throws DbException { GroupId g) throws DbException {
@@ -714,6 +731,25 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return status; return status;
} }
@Override
public Map<MessageId, Integer> getUnackedMessagesToSend(
Transaction transaction,
ContactId c) throws DbException {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
return db.getUnackedMessagesToSend(txn, c);
}
@Override
public long getUnackedMessageBytesToSend(Transaction transaction,
ContactId c) throws DbException {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
return db.getUnackedMessageBytesToSend(txn, c);
}
@Override @Override
public Map<MessageId, MessageState> getMessageDependencies( public Map<MessageId, MessageState> getMessageDependencies(
Transaction transaction, MessageId m) throws DbException { Transaction transaction, MessageId m) throws DbException {

View File

@@ -51,6 +51,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -344,6 +345,11 @@ abstract class JdbcDatabase implements Database<Connection> {
"CREATE INDEX IF NOT EXISTS statusesByContactIdTimestamp" "CREATE INDEX IF NOT EXISTS statusesByContactIdTimestamp"
+ " ON statuses (contactId, timestamp)"; + " ON statuses (contactId, timestamp)";
private static final String
INDEX_STATUSES_BY_CONTACT_ID_TX_COUNT_TIMESTAMP =
"CREATE INDEX IF NOT EXISTS statusesByContactIdTxCountTimestamp"
+ " ON statuses (contactId, txCount, timestamp)";
private static final String INDEX_MESSAGES_BY_CLEANUP_DEADLINE = private static final String INDEX_MESSAGES_BY_CLEANUP_DEADLINE =
"CREATE INDEX IF NOT EXISTS messagesByCleanupDeadline" "CREATE INDEX IF NOT EXISTS messagesByCleanupDeadline"
+ " ON messages (cleanupDeadline)"; + " ON messages (cleanupDeadline)";
@@ -570,6 +576,7 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(INDEX_MESSAGE_DEPENDENCIES_BY_DEPENDENCY_ID); s.executeUpdate(INDEX_MESSAGE_DEPENDENCIES_BY_DEPENDENCY_ID);
s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_GROUP_ID); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_GROUP_ID);
s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TIMESTAMP); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TIMESTAMP);
s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TX_COUNT_TIMESTAMP);
s.executeUpdate(INDEX_MESSAGES_BY_CLEANUP_DEADLINE); s.executeUpdate(INDEX_MESSAGES_BY_CLEANUP_DEADLINE);
s.close(); s.close();
} catch (SQLException e) { } catch (SQLException e) {
@@ -1781,37 +1788,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
@Override
public long getMessageBytesToSend(Connection txn, ContactId c,
int maxLatency) throws DbException {
long now = clock.currentTimeMillis();
long eta = now + maxLatency;
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT SUM(length) FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE AND seen = FALSE"
+ " AND (expiry <= ? OR eta > ?)";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now);
ps.setLong(4, eta);
rs = ps.executeQuery();
rs.next();
long total = rs.getInt(1);
rs.close();
ps.close();
return total;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override @Override
public Collection<MessageId> getMessageIds(Connection txn, GroupId g) public Collection<MessageId> getMessageIds(Connection txn, GroupId g)
throws DbException { throws DbException {
@@ -2259,6 +2235,63 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
@Override
public Map<MessageId, Integer> getUnackedMessagesToSend(Connection txn,
ContactId c) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT length, messageId FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE AND seen = FALSE"
+ " ORDER BY txCount, timestamp";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
rs = ps.executeQuery();
Map<MessageId, Integer> results = new LinkedHashMap<>();
while (rs.next()) {
int length = rs.getInt(1);
MessageId id = new MessageId(rs.getBytes(2));
results.put(id, length);
}
rs.close();
ps.close();
return results;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public long getUnackedMessageBytesToSend(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT SUM(length) FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE AND seen = FALSE";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
rs = ps.executeQuery();
rs.next();
long total = rs.getInt(1);
rs.close();
ps.close();
return total;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override @Override
public Collection<MessageId> getMessagesToValidate(Connection txn) public Collection<MessageId> getMessagesToValidate(Connection txn)
throws DbException { throws DbException {

View File

@@ -92,6 +92,11 @@ abstract class AbstractRemovableDrivePlugin implements SimplexPlugin {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean isLossyAndCheap() {
return true;
}
@Override @Override
public TransportConnectionReader createReader(TransportProperties p) { public TransportConnectionReader createReader(TransportProperties p) {
try { try {

View File

@@ -36,6 +36,11 @@ class FileTransportWriter implements TransportConnectionWriter {
return plugin.getMaxIdleTime(); return plugin.getMaxIdleTime();
} }
@Override
public boolean isLossyAndCheap() {
return plugin.isLossyAndCheap();
}
@Override @Override
public OutputStream getOutputStream() { public OutputStream getOutputStream() {
return out; return out;

View File

@@ -60,10 +60,9 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
setSuccess(false); setSuccess(false);
return; return;
} }
int maxLatency = plugin.getMaxLatency();
try { try {
setTotal(db.transactionWithResult(true, txn -> setTotal(db.transactionWithResult(true, txn ->
db.getMessageBytesToSend(txn, contactId, maxLatency))); db.getUnackedMessageBytesToSend(txn, contactId)));
} catch (DbException e) { } catch (DbException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
registry.removeWriter(this); registry.removeWriter(this);
@@ -106,6 +105,11 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl
return delegate.getMaxIdleTime(); return delegate.getMaxIdleTime();
} }
@Override
public boolean isLossyAndCheap() {
return delegate.isLossyAndCheap();
}
@Override @Override
public OutputStream getOutputStream() throws IOException { public OutputStream getOutputStream() throws IOException {
return delegate.getOutputStream(); return delegate.getOutputStream();

View File

@@ -1,8 +1,8 @@
package org.briarproject.bramble.plugin.file; package org.briarproject.bramble.plugin.file;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.Plugin;
import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -17,10 +17,10 @@ class TransportOutputStreamWriter implements TransportConnectionWriter {
private static final Logger LOG = private static final Logger LOG =
getLogger(TransportOutputStreamWriter.class.getName()); getLogger(TransportOutputStreamWriter.class.getName());
private final Plugin plugin; private final SimplexPlugin plugin;
private final OutputStream out; private final OutputStream out;
TransportOutputStreamWriter(Plugin plugin, OutputStream out) { TransportOutputStreamWriter(SimplexPlugin plugin, OutputStream out) {
this.plugin = plugin; this.plugin = plugin;
this.out = out; this.out = out;
} }
@@ -35,6 +35,11 @@ class TransportOutputStreamWriter implements TransportConnectionWriter {
return plugin.getMaxIdleTime(); return plugin.getMaxIdleTime();
} }
@Override
public boolean isLossyAndCheap() {
return plugin.isLossyAndCheap();
}
@Override @Override
public OutputStream getOutputStream() { public OutputStream getOutputStream() {
return out; return out;

View File

@@ -15,6 +15,7 @@ import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent; import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.sync.Versions;
@@ -22,7 +23,11 @@ import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@@ -61,6 +66,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
private final int maxLatency; private final int maxLatency;
private final boolean eager;
private final StreamWriter streamWriter; private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter; private final SyncRecordWriter recordWriter;
private final AtomicInteger outstandingQueries; private final AtomicInteger outstandingQueries;
@@ -70,7 +76,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId, TransportId transportId, EventBus eventBus, ContactId contactId, TransportId transportId,
int maxLatency, StreamWriter streamWriter, int maxLatency, boolean eager, StreamWriter streamWriter,
SyncRecordWriter recordWriter) { SyncRecordWriter recordWriter) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
@@ -78,6 +84,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId; this.transportId = transportId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.eager = eager;
this.streamWriter = streamWriter; this.streamWriter = streamWriter;
this.recordWriter = recordWriter; this.recordWriter = recordWriter;
outstandingQueries = new AtomicInteger(2); // One per type of record outstandingQueries = new AtomicInteger(2); // One per type of record
@@ -92,8 +99,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
// Send our supported protocol versions // Send our supported protocol versions
recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS));
// Start a query for each type of record // Start a query for each type of record
dbExecutor.execute(new GenerateAck()); dbExecutor.execute(this::generateAck);
dbExecutor.execute(new GenerateBatch()); if (eager) dbExecutor.execute(this::loadUnackedMessageIds);
else dbExecutor.execute(this::generateBatch);
// Write records until interrupted or no more records to write // Write records until interrupted or no more records to write
try { try {
while (!interrupted) { while (!interrupted) {
@@ -138,81 +146,110 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
} }
} }
private class GenerateAck implements Runnable { @DatabaseExecutor
private void loadUnackedMessageIds() {
@DatabaseExecutor if (interrupted) return;
@Override try {
public void run() { Map<MessageId, Integer> ids = db.transactionWithResult(true, txn ->
if (interrupted) return; db.getUnackedMessagesToSend(txn, contactId));
try { if (LOG.isLoggable(INFO)) {
Ack a = db.transactionWithNullableResult(false, txn -> LOG.info(ids.size() + " unacked messages to send");
db.generateAck(txn, contactId, MAX_MESSAGE_IDS));
if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if (a == null) decrementOutstandingQueries();
else writerTasks.add(new WriteAck(a));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
} }
if (ids.isEmpty()) decrementOutstandingQueries();
else dbExecutor.execute(() -> generateEagerBatch(ids));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
} }
} }
private class WriteAck implements ThrowingRunnable<IOException> { @DatabaseExecutor
private void generateEagerBatch(Map<MessageId, Integer> ids) {
private final Ack ack; if (interrupted) return;
// Take some message IDs from `ids` to form a batch
private WriteAck(Ack ack) { Collection<MessageId> batchIds = new ArrayList<>();
this.ack = ack; long totalLength = 0;
Iterator<Entry<MessageId, Integer>> it = ids.entrySet().iterator();
while (it.hasNext()) {
// Check whether the next message will fit in the batch
Entry<MessageId, Integer> e = it.next();
int length = e.getValue();
if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break;
// Add the message to the batch
it.remove();
batchIds.add(e.getKey());
totalLength += length;
} }
if (batchIds.isEmpty()) throw new AssertionError();
@IoExecutor try {
@Override Collection<Message> batch =
public void run() throws IOException { db.transactionWithResult(false, txn ->
if (interrupted) return; db.generateBatch(txn, contactId, batchIds,
recordWriter.writeAck(ack); maxLatency));
LOG.info("Sent ack"); writerTasks.add(() -> writeEagerBatch(batch, ids));
dbExecutor.execute(new GenerateAck()); } catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
} }
} }
private class GenerateBatch implements Runnable { @IoExecutor
private void writeEagerBatch(Collection<Message> batch,
Map<MessageId, Integer> ids) throws IOException {
if (interrupted) return;
for (Message m : batch) recordWriter.writeMessage(m);
LOG.info("Sent eager batch");
if (ids.isEmpty()) decrementOutstandingQueries();
else dbExecutor.execute(() -> generateEagerBatch(ids));
}
@DatabaseExecutor @DatabaseExecutor
@Override private void generateAck() {
public void run() { if (interrupted) return;
if (interrupted) return; try {
try { Ack a = db.transactionWithNullableResult(false, txn ->
Collection<Message> b = db.generateAck(txn, contactId, MAX_MESSAGE_IDS));
db.transactionWithNullableResult(false, txn -> if (LOG.isLoggable(INFO))
db.generateBatch(txn, contactId, LOG.info("Generated ack: " + (a != null));
MAX_RECORD_PAYLOAD_BYTES, maxLatency)); if (a == null) decrementOutstandingQueries();
if (LOG.isLoggable(INFO)) else writerTasks.add(() -> writeAck(a));
LOG.info("Generated batch: " + (b != null)); } catch (DbException e) {
if (b == null) decrementOutstandingQueries(); logException(LOG, WARNING, e);
else writerTasks.add(new WriteBatch(b)); interrupt();
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
}
} }
} }
private class WriteBatch implements ThrowingRunnable<IOException> { @IoExecutor
private void writeAck(Ack ack) throws IOException {
if (interrupted) return;
recordWriter.writeAck(ack);
LOG.info("Sent ack");
dbExecutor.execute(this::generateAck);
}
private final Collection<Message> batch; @DatabaseExecutor
private void generateBatch() {
private WriteBatch(Collection<Message> batch) { if (interrupted) return;
this.batch = batch; try {
Collection<Message> b =
db.transactionWithNullableResult(false, txn ->
db.generateBatch(txn, contactId,
MAX_RECORD_PAYLOAD_BYTES, maxLatency));
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b == null) decrementOutstandingQueries();
else writerTasks.add(() -> writeBatch(b));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
} }
}
@IoExecutor @IoExecutor
@Override private void writeBatch(Collection<Message> batch) throws IOException {
public void run() throws IOException { if (interrupted) return;
if (interrupted) return; for (Message m : batch) recordWriter.writeMessage(m);
for (Message m : batch) recordWriter.writeMessage(m); LOG.info("Sent batch");
LOG.info("Sent batch"); dbExecutor.execute(this::generateBatch);
dbExecutor.execute(new GenerateBatch());
}
} }
} }

View File

@@ -60,12 +60,12 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
@Override @Override
public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
int maxLatency, StreamWriter streamWriter) { int maxLatency, boolean eager, StreamWriter streamWriter) {
OutputStream out = streamWriter.getOutputStream(); OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter = SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out); recordWriterFactory.createRecordWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t, return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
maxLatency, streamWriter, recordWriter); maxLatency, eager, streamWriter, recordWriter);
} }
@Override @Override

View File

@@ -358,7 +358,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
try { try {
db.transaction(true, transaction -> db.transaction(true, transaction ->
db.getMessageBytesToSend(transaction, contactId, 123)); db.getUnackedMessageBytesToSend(transaction, contactId));
fail(); fail();
} catch (NoSuchContactException expected) { } catch (NoSuchContactException expected) {
// Expected // Expected

View File

@@ -228,7 +228,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
assertEquals(message.getRawLength(), assertEquals(message.getRawLength(),
db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); db.getUnackedMessageBytesToSend(txn, contactId));
// Changing the status to seen = true should make the message unsendable // Changing the status to seen = true should make the message unsendable
db.raiseSeenFlag(txn, contactId, messageId); db.raiseSeenFlag(txn, contactId, messageId);
@@ -236,7 +236,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
db.commitTransaction(txn); db.commitTransaction(txn);
db.close(); db.close();
@@ -261,7 +261,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
// Marking the message delivered should make it sendable // Marking the message delivered should make it sendable
db.setMessageState(txn, messageId, DELIVERED); db.setMessageState(txn, messageId, DELIVERED);
@@ -270,7 +270,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
assertEquals(message.getRawLength(), assertEquals(message.getRawLength(),
db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); db.getUnackedMessageBytesToSend(txn, contactId));
// Marking the message invalid should make it unsendable // Marking the message invalid should make it unsendable
db.setMessageState(txn, messageId, INVALID); db.setMessageState(txn, messageId, INVALID);
@@ -278,7 +278,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
// Marking the message pending should make it unsendable // Marking the message pending should make it unsendable
db.setMessageState(txn, messageId, PENDING); db.setMessageState(txn, messageId, PENDING);
@@ -286,7 +286,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
db.commitTransaction(txn); db.commitTransaction(txn);
db.close(); db.close();
@@ -310,7 +310,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
// Making the group visible should not make the message sendable // Making the group visible should not make the message sendable
db.addGroupVisibility(txn, contactId, groupId, false); db.addGroupVisibility(txn, contactId, groupId, false);
@@ -318,7 +318,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
// Sharing the group should make the message sendable // Sharing the group should make the message sendable
db.setGroupVisibility(txn, contactId, groupId, true); db.setGroupVisibility(txn, contactId, groupId, true);
@@ -327,7 +327,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
assertEquals(message.getRawLength(), assertEquals(message.getRawLength(),
db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); db.getUnackedMessageBytesToSend(txn, contactId));
// Unsharing the group should make the message unsendable // Unsharing the group should make the message unsendable
db.setGroupVisibility(txn, contactId, groupId, false); db.setGroupVisibility(txn, contactId, groupId, false);
@@ -335,7 +335,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
// Making the group invisible should make the message unsendable // Making the group invisible should make the message unsendable
db.removeGroupVisibility(txn, contactId, groupId); db.removeGroupVisibility(txn, contactId, groupId);
@@ -343,7 +343,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
db.commitTransaction(txn); db.commitTransaction(txn);
db.close(); db.close();
@@ -368,7 +368,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId));
// Sharing the message should make it sendable // Sharing the message should make it sendable
db.setMessageShared(txn, messageId, true); db.setMessageShared(txn, messageId, true);
@@ -377,7 +377,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
assertEquals(message.getRawLength(), assertEquals(message.getRawLength(),
db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); db.getUnackedMessageBytesToSend(txn, contactId));
db.commitTransaction(txn); db.commitTransaction(txn);
db.close(); db.close();
@@ -402,14 +402,14 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
MAX_LATENCY); MAX_LATENCY);
assertTrue(ids.isEmpty()); assertTrue(ids.isEmpty());
assertEquals(message.getRawLength(), assertEquals(message.getRawLength(),
db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); db.getUnackedMessageBytesToSend(txn, contactId));
// The message is just the right size to send // The message is just the right size to send
ids = db.getMessagesToSend(txn, contactId, message.getRawLength(), ids = db.getMessagesToSend(txn, contactId, message.getRawLength(),
MAX_LATENCY); MAX_LATENCY);
assertEquals(singletonList(messageId), ids); assertEquals(singletonList(messageId), ids);
assertEquals(message.getRawLength(), assertEquals(message.getRawLength(),
db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); db.getUnackedMessageBytesToSend(txn, contactId));
db.commitTransaction(txn); db.commitTransaction(txn);
db.close(); db.close();

View File

@@ -17,9 +17,14 @@ import org.briarproject.bramble.test.DbExpectations;
import org.briarproject.bramble.test.ImmediateExecutor; import org.briarproject.bramble.test.ImmediateExecutor;
import org.junit.Test; import org.junit.Test;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getContactId;
import static org.briarproject.bramble.test.TestUtils.getMessage; import static org.briarproject.bramble.test.TestUtils.getMessage;
@@ -39,14 +44,19 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
private final Executor dbExecutor = new ImmediateExecutor(); private final Executor dbExecutor = new ImmediateExecutor();
private final ContactId contactId = getContactId(); private final ContactId contactId = getContactId();
private final TransportId transportId = getTransportId(); private final TransportId transportId = getTransportId();
private final Message message = getMessage(new GroupId(getRandomId())); private final Ack ack =
private final MessageId messageId = message.getId(); new Ack(singletonList(new MessageId(getRandomId())));
private final Message message = getMessage(new GroupId(getRandomId()),
MAX_MESSAGE_BODY_LENGTH);
private final Message message1 = getMessage(new GroupId(getRandomId()),
MAX_MESSAGE_BODY_LENGTH);
@Test @Test
public void testNothingToSend() throws Exception { public void testNothingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
streamWriter, recordWriter); false, streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false);
Transaction noMsgTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false);
@@ -63,8 +73,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
// No messages to send // No messages to send
oneOf(db).transactionWithNullableResult(with(false), oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noMsgTxn)); withNullableDbCallable(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId), oneOf(db).generateBatch(noMsgTxn, contactId,
with(any(int.class)), with(MAX_LATENCY)); MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
will(returnValue(null)); will(returnValue(null));
// Send the end of stream marker // Send the end of stream marker
oneOf(streamWriter).sendEndOfStream(); oneOf(streamWriter).sendEndOfStream();
@@ -76,11 +86,44 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
} }
@Test @Test
public void testSomethingToSend() throws Exception { public void testNothingToSendEagerly() throws Exception {
Ack ack = new Ack(singletonList(messageId));
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
streamWriter, recordWriter); true, streamWriter, recordWriter);
Transaction noAckTxn = new Transaction(null, false);
Transaction noIdsTxn = new Transaction(null, true);
context.checking(new DbExpectations() {{
// Add listener
oneOf(eventBus).addListener(session);
// Send the protocol versions
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
// No acks to send
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noAckTxn));
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(null));
// No messages to send
oneOf(db).transactionWithResult(with(true),
withDbCallable(noIdsTxn));
oneOf(db).getUnackedMessagesToSend(noIdsTxn, contactId);
will(returnValue(emptyMap()));
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
// Remove listener
oneOf(eventBus).removeListener(session);
}});
session.run();
}
@Test
public void testSomethingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
false, streamWriter, recordWriter);
Transaction ackTxn = new Transaction(null, false); Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false);
Transaction msgTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false);
@@ -100,8 +143,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
// One message to send // One message to send
oneOf(db).transactionWithNullableResult(with(false), oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(msgTxn)); withNullableDbCallable(msgTxn));
oneOf(db).generateBatch(with(msgTxn), with(contactId), oneOf(db).generateBatch(msgTxn, contactId,
with(any(int.class)), with(MAX_LATENCY)); MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
will(returnValue(singletonList(message))); will(returnValue(singletonList(message)));
oneOf(recordWriter).writeMessage(message); oneOf(recordWriter).writeMessage(message);
// No more acks // No more acks
@@ -112,8 +155,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
// No more messages // No more messages
oneOf(db).transactionWithNullableResult(with(false), oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noMsgTxn)); withNullableDbCallable(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId), oneOf(db).generateBatch(noMsgTxn, contactId,
with(any(int.class)), with(MAX_LATENCY)); MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY);
will(returnValue(null)); will(returnValue(null));
// Send the end of stream marker // Send the end of stream marker
oneOf(streamWriter).sendEndOfStream(); oneOf(streamWriter).sendEndOfStream();
@@ -123,4 +166,63 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
session.run(); session.run();
} }
@Test
public void testSomethingToSendEagerly() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, MAX_LATENCY,
true, streamWriter, recordWriter);
Map<MessageId, Integer> unacked = new LinkedHashMap<>();
unacked.put(message.getId(), message.getRawLength());
unacked.put(message1.getId(), message1.getRawLength());
Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false);
Transaction idsTxn = new Transaction(null, true);
Transaction msgTxn = new Transaction(null, false);
Transaction msgTxn1 = new Transaction(null, false);
context.checking(new DbExpectations() {{
// Add listener
oneOf(eventBus).addListener(session);
// Send the protocol versions
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
// One ack to send
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(ackTxn));
oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(ack));
oneOf(recordWriter).writeAck(ack);
// No more acks
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noAckTxn));
oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS);
will(returnValue(null));
// Two messages to send
oneOf(db).transactionWithResult(with(true), withDbCallable(idsTxn));
oneOf(db).getUnackedMessagesToSend(idsTxn, contactId);
will(returnValue(unacked));
// Send the first message
oneOf(db).transactionWithResult(with(false),
withDbCallable(msgTxn));
oneOf(db).generateBatch(msgTxn, contactId,
singletonList(message.getId()), MAX_LATENCY);
will(returnValue(singletonList(message)));
oneOf(recordWriter).writeMessage(message);
// Send the second message
oneOf(db).transactionWithResult(with(false),
withDbCallable(msgTxn1));
oneOf(db).generateBatch(msgTxn1, contactId,
singletonList(message1.getId()), MAX_LATENCY);
will(returnValue(singletonList(message1)));
oneOf(recordWriter).writeMessage(message1);
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
// Remove listener
oneOf(eventBus).removeListener(session);
}});
session.run();
}
} }

View File

@@ -25,7 +25,7 @@ public class TestDuplexTransportConnection
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
public TestDuplexTransportConnection(InputStream in, OutputStream out) { public TestDuplexTransportConnection(InputStream in, OutputStream out) {
reader = new TestTransportConnectionReader(in); reader = new TestTransportConnectionReader(in);
writer = new TestTransportConnectionWriter(out); writer = new TestTransportConnectionWriter(out, false);
} }
@Override @Override

View File

@@ -15,10 +15,13 @@ public class TestTransportConnectionWriter
implements TransportConnectionWriter { implements TransportConnectionWriter {
private final OutputStream out; private final OutputStream out;
private final boolean lossyAndCheap;
private final CountDownLatch disposed = new CountDownLatch(1); private final CountDownLatch disposed = new CountDownLatch(1);
public TestTransportConnectionWriter(OutputStream out) { public TestTransportConnectionWriter(OutputStream out,
boolean lossyAndCheap) {
this.out = out; this.out = out;
this.lossyAndCheap = lossyAndCheap;
} }
public CountDownLatch getDisposedLatch() { public CountDownLatch getDisposedLatch() {
@@ -35,6 +38,11 @@ public class TestTransportConnectionWriter
return 60_000; return 60_000;
} }
@Override
public boolean isLossyAndCheap() {
return lossyAndCheap;
}
@Override @Override
public OutputStream getOutputStream() { public OutputStream getOutputStream() {
return out; return out;

View File

@@ -11,7 +11,9 @@ import org.briarproject.bramble.api.identity.IdentityManager;
import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.GroupId; import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent; import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.sync.event.MessagesSentEvent;
import org.briarproject.bramble.test.TestDatabaseConfigModule; import org.briarproject.bramble.test.TestDatabaseConfigModule;
import org.briarproject.bramble.test.TestTransportConnectionReader; import org.briarproject.bramble.test.TestTransportConnectionReader;
import org.briarproject.bramble.test.TestTransportConnectionWriter; import org.briarproject.bramble.test.TestTransportConnectionWriter;
@@ -71,7 +73,16 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
} }
@Test @Test
public void testWriteAndRead() throws Exception { public void testWriteAndReadWithLazyRetransmission() throws Exception {
testWriteAndRead(false);
}
@Test
public void testWriteAndReadWithEagerRetransmission() throws Exception {
testWriteAndRead(true);
}
private void testWriteAndRead(boolean eager) throws Exception {
// Create the identities // Create the identities
Identity aliceIdentity = Identity aliceIdentity =
alice.getIdentityManager().createIdentity("Alice"); alice.getIdentityManager().createIdentity("Alice");
@@ -86,16 +97,21 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
bob.getEventBus().addListener(listener); bob.getEventBus().addListener(listener);
// Alice sends a private message to Bob // Alice sends a private message to Bob
sendMessage(alice, bobId); sendMessage(alice, bobId);
// Sync Alice's client versions and transport properties // Sync Alice's client versions
read(bob, write(alice, bobId), 2); read(bob, write(alice, bobId, eager, 1), 1);
// Sync Bob's client versions and transport properties // Sync Bob's client versions
read(alice, write(bob, aliceId), 2); read(alice, write(bob, aliceId, eager, 1), 1);
// Sync the private message and the attachment // Sync Alice's second client versioning update (with the active flag
read(bob, write(alice, bobId), 2); // raised), the private message and the attachment
read(bob, write(alice, bobId, eager, 3), 3);
// Bob should have received the private message // Bob should have received the private message
assertTrue(listener.messageAdded); assertTrue(listener.messageAdded);
// Bob should have received the attachment // Bob should have received the attachment
assertTrue(listener.attachmentAdded); assertTrue(listener.attachmentAdded);
// Sync messages from Alice to Bob again. If using eager
// retransmission, the three unacked messages should be sent again.
// They're all duplicates, so no further deliveries should occur
read(bob, write(alice, bobId, eager, eager ? 3 : 0), 0);
} }
private ContactId setUp(SimplexMessagingIntegrationTestComponent device, private ContactId setUp(SimplexMessagingIntegrationTestComponent device,
@@ -149,15 +165,24 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
} }
private byte[] write(SimplexMessagingIntegrationTestComponent device, private byte[] write(SimplexMessagingIntegrationTestComponent device,
ContactId contactId) throws Exception { ContactId contactId, boolean eager, int transmissions)
throws Exception {
// Listen for message transmissions
MessageTransmissionListener listener =
new MessageTransmissionListener(transmissions);
device.getEventBus().addListener(listener);
// Write the outgoing stream // Write the outgoing stream
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
TestTransportConnectionWriter writer = TestTransportConnectionWriter writer =
new TestTransportConnectionWriter(out); new TestTransportConnectionWriter(out, eager);
device.getConnectionManager().manageOutgoingConnection(contactId, device.getConnectionManager().manageOutgoingConnection(contactId,
SIMPLEX_TRANSPORT_ID, writer); SIMPLEX_TRANSPORT_ID, writer);
// Wait for the writer to be disposed // Wait for the writer to be disposed
writer.getDisposedLatch().await(TIMEOUT_MS, MILLISECONDS); writer.getDisposedLatch().await(TIMEOUT_MS, MILLISECONDS);
// Check that the expected number of messages were sent
assertTrue(listener.sent.await(TIMEOUT_MS, MILLISECONDS));
// Clean up the listener
device.getEventBus().removeListener(listener);
// Return the contents of the stream // Return the contents of the stream
return out.toByteArray(); return out.toByteArray();
} }
@@ -178,6 +203,24 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
deleteTestDirectory(testDir); deleteTestDirectory(testDir);
} }
@NotNullByDefault
private static class MessageTransmissionListener implements EventListener {
private final CountDownLatch sent;
private MessageTransmissionListener(int transmissions) {
sent = new CountDownLatch(transmissions);
}
@Override
public void eventOccurred(Event e) {
if (e instanceof MessagesSentEvent) {
MessagesSentEvent m = (MessagesSentEvent) e;
for (MessageId ignored : m.getMessageIds()) sent.countDown();
}
}
}
@NotNullByDefault @NotNullByDefault
private static class MessageDeliveryListener implements EventListener { private static class MessageDeliveryListener implements EventListener {
@@ -191,7 +234,9 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
public void eventOccurred(Event e) { public void eventOccurred(Event e) {
if (e instanceof MessageStateChangedEvent) { if (e instanceof MessageStateChangedEvent) {
MessageStateChangedEvent m = (MessageStateChangedEvent) e; MessageStateChangedEvent m = (MessageStateChangedEvent) e;
if (m.getState().equals(DELIVERED)) delivered.countDown(); if (!m.isLocal() && m.getState().equals(DELIVERED)) {
delivered.countDown();
}
} }
} }
} }

View File

@@ -429,7 +429,7 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
// Write the messages to a transport stream // Write the messages to a transport stream
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
TestTransportConnectionWriter writer = TestTransportConnectionWriter writer =
new TestTransportConnectionWriter(out); new TestTransportConnectionWriter(out, false);
fromComponent.getConnectionManager().manageOutgoingConnection(toId, fromComponent.getConnectionManager().manageOutgoingConnection(toId,
SIMPLEX_TRANSPORT_ID, writer); SIMPLEX_TRANSPORT_ID, writer);
writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS); writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS);
@@ -487,7 +487,7 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
// start outgoing connection // start outgoing connection
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
TestTransportConnectionWriter writer = TestTransportConnectionWriter writer =
new TestTransportConnectionWriter(out); new TestTransportConnectionWriter(out, false);
fromComponent.getConnectionManager().manageOutgoingConnection(toId, fromComponent.getConnectionManager().manageOutgoingConnection(toId,
SIMPLEX_TRANSPORT_ID, writer); SIMPLEX_TRANSPORT_ID, writer);
writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS); writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS);