diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index 2771302a2..0d055641a 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -190,6 +190,18 @@ public interface DatabaseComponent extends TransactionManager { Collection generateBatch(Transaction txn, ContactId c, int maxLength, int maxLatency) throws DbException; + /** + * Returns a batch of messages for the given contact containing the + * messages with the given IDs, for transmission over a transport with + * the given maximum latency. + *

+ * If any of the given messages are not in the database or are not visible + * to the contact, they are omitted from the batch without throwing an + * exception. + */ + Collection generateBatch(Transaction txn, ContactId c, + Collection ids, int maxLatency) throws DbException; + /** * Returns an offer for the given contact for transmission over a * transport with the given maximum latency, or null if there are no @@ -302,16 +314,6 @@ public interface DatabaseComponent extends TransactionManager { */ Message getMessage(Transaction txn, MessageId m) throws DbException; - /** - * Returns the total length, including headers, of any messages that are - * eligible to be sent to the given contact via a transport with the given - * max latency. - *

- * Read-only. - */ - long getMessageBytesToSend(Transaction txn, ContactId c, int maxLatency) - throws DbException; - /** * Returns the IDs of all delivered messages in the given group. *

@@ -446,6 +448,27 @@ public interface DatabaseComponent extends TransactionManager { MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m) throws DbException; + /** + * Returns the IDs of all messages that are eligible to be sent to the + * given contact, together with their raw lengths. This may include + * messages that have already been sent and are not yet due for + * retransmission. + *

+ * Read-only. + */ + Map getUnackedMessagesToSend(Transaction txn, + ContactId c) throws DbException; + + /** + * Returns the total length, including headers, of all messages that are + * eligible to be sent to the given contact. This may include messages + * that have already been sent and are not yet due for retransmission. + *

+ * Read-only. + */ + long getUnackedMessageBytesToSend(Transaction txn, ContactId c) + throws DbException; + /** * Returns the next time (in milliseconds since the Unix epoch) when a * message is due to be deleted, or {@link #NO_CLEANUP_DEADLINE} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/TransportConnectionWriter.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/TransportConnectionWriter.java index 219f33efe..4ed5ba1ed 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/TransportConnectionWriter.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/TransportConnectionWriter.java @@ -22,6 +22,11 @@ public interface TransportConnectionWriter { */ int getMaxIdleTime(); + /** + * Returns true if the transport is lossy and cheap. + */ + boolean isLossyAndCheap(); + /** * Returns an output stream for writing to the transport connection. */ diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/AbstractDuplexTransportConnection.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/AbstractDuplexTransportConnection.java index 27aad596a..64a13f5e1 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/AbstractDuplexTransportConnection.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/duplex/AbstractDuplexTransportConnection.java @@ -79,6 +79,11 @@ public abstract class AbstractDuplexTransportConnection return plugin.getMaxIdleTime(); } + @Override + public boolean isLossyAndCheap() { + return false; + } + @Override public OutputStream getOutputStream() throws IOException { return AbstractDuplexTransportConnection.this.getOutputStream(); diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/simplex/SimplexPlugin.java b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/simplex/SimplexPlugin.java index 9df61968d..f7cf1e801 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/simplex/SimplexPlugin.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/plugin/simplex/SimplexPlugin.java @@ -15,6 +15,12 @@ import javax.annotation.Nullable; @NotNullByDefault public interface SimplexPlugin extends Plugin { + /** + * Returns true if the transport is likely to lose streams and the cost of + * transmitting redundant copies of data is cheap. + */ + boolean isLossyAndCheap(); + /** * Attempts to create and return a reader for the given transport * properties. Returns null if a reader cannot be created. diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java index a19e211fb..e863089ff 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java @@ -16,7 +16,7 @@ public interface SyncSessionFactory { PriorityHandler handler); SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, - int maxLatency, StreamWriter streamWriter); + int maxLatency, boolean eager, StreamWriter streamWriter); SyncSession createDuplexOutgoingSession(ContactId c, TransportId t, int maxLatency, int maxIdleTime, StreamWriter streamWriter, diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java index a5ad6dfc8..9bec08193 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java @@ -71,8 +71,10 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); ContactId c = requireNonNull(ctx.getContactId()); + // Use eager retransmission if the transport is lossy and cheap return syncSessionFactory.createSimplexOutgoingSession(c, - ctx.getTransportId(), w.getMaxLatency(), streamWriter); + ctx.getTransportId(), w.getMaxLatency(), w.isLossyAndCheap(), + streamWriter); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index 705913ee7..64f54f19b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -357,16 +357,6 @@ interface Database { */ Message getMessage(T txn, MessageId m) throws DbException; - /** - * Returns the total length, including headers, of any messages that are - * eligible to be sent to the given contact via a transport with the given - * max latency. - *

- * Read-only. - */ - long getMessageBytesToSend(T txn, ContactId c, int maxLatency) - throws DbException; - /** * Returns the IDs and states of all dependencies of the given message. * For missing dependencies and dependencies in other groups, the state @@ -496,11 +486,37 @@ interface Database { * Returns the IDs of some messages that are eligible to be sent to the * given contact, up to the given total length. *

+ * Unlike {@link #getUnackedMessagesToSend(Object, ContactId)} this method + * does not return messages that have already been sent unless they are + * due for retransmission. + *

* Read-only. */ Collection getMessagesToSend(T txn, ContactId c, int maxLength, int maxLatency) throws DbException; + /** + * Returns the IDs of all messages that are eligible to be sent to the + * given contact, together with their raw lengths. + *

+ * Unlike {@link #getMessagesToSend(Object, ContactId, int, int)} this + * method may return messages that have already been sent and are not yet + * due for retransmission. + *

+ * Read-only. + */ + Map getUnackedMessagesToSend(T txn, ContactId c) + throws DbException; + + /** + * Returns the total length, including headers, of all messages that are + * eligible to be sent to the given contact. This may include messages + * that have already been sent and are not yet due for retransmission. + *

+ * Read-only. + */ + long getUnackedMessageBytesToSend(T txn, ContactId c) throws DbException; + /** * Returns the IDs of any messages that need to be validated. *

diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index b2422f9bc..c92245acd 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -436,6 +436,32 @@ class DatabaseComponentImpl implements DatabaseComponent { return messages; } + @Override + public Collection generateBatch(Transaction transaction, + ContactId c, Collection ids, int maxLatency) + throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + long totalLength = 0; + List messages = new ArrayList<>(ids.size()); + List sentIds = new ArrayList<>(ids.size()); + for (MessageId m : ids) { + if (db.containsVisibleMessage(txn, c, m)) { + Message message = db.getMessage(txn, m); + totalLength += message.getRawLength(); + messages.add(message); + sentIds.add(m); + db.updateExpiryTimeAndEta(txn, c, m, maxLatency); + } + } + if (messages.isEmpty()) return messages; + db.lowerRequestedFlag(txn, c, sentIds); + transaction.attach(new MessagesSentEvent(c, sentIds, totalLength)); + return messages; + } + @Nullable @Override public Offer generateOffer(Transaction transaction, ContactId c, @@ -582,15 +608,6 @@ class DatabaseComponentImpl implements DatabaseComponent { return db.getMessage(txn, m); } - @Override - public long getMessageBytesToSend(Transaction transaction, ContactId c, - int maxLatency) throws DbException { - T txn = unbox(transaction); - if (!db.containsContact(txn, c)) - throw new NoSuchContactException(); - return db.getMessageBytesToSend(txn, c, maxLatency); - } - @Override public Collection getMessageIds(Transaction transaction, GroupId g) throws DbException { @@ -714,6 +731,25 @@ class DatabaseComponentImpl implements DatabaseComponent { return status; } + @Override + public Map getUnackedMessagesToSend( + Transaction transaction, + ContactId c) throws DbException { + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + return db.getUnackedMessagesToSend(txn, c); + } + + @Override + public long getUnackedMessageBytesToSend(Transaction transaction, + ContactId c) throws DbException { + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + return db.getUnackedMessageBytesToSend(txn, c); + } + @Override public Map getMessageDependencies( Transaction transaction, MessageId m) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index 224b20bf1..c77010b82 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -51,6 +51,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -344,6 +345,11 @@ abstract class JdbcDatabase implements Database { "CREATE INDEX IF NOT EXISTS statusesByContactIdTimestamp" + " ON statuses (contactId, timestamp)"; + private static final String + INDEX_STATUSES_BY_CONTACT_ID_TX_COUNT_TIMESTAMP = + "CREATE INDEX IF NOT EXISTS statusesByContactIdTxCountTimestamp" + + " ON statuses (contactId, txCount, timestamp)"; + private static final String INDEX_MESSAGES_BY_CLEANUP_DEADLINE = "CREATE INDEX IF NOT EXISTS messagesByCleanupDeadline" + " ON messages (cleanupDeadline)"; @@ -570,6 +576,7 @@ abstract class JdbcDatabase implements Database { s.executeUpdate(INDEX_MESSAGE_DEPENDENCIES_BY_DEPENDENCY_ID); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_GROUP_ID); s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TIMESTAMP); + s.executeUpdate(INDEX_STATUSES_BY_CONTACT_ID_TX_COUNT_TIMESTAMP); s.executeUpdate(INDEX_MESSAGES_BY_CLEANUP_DEADLINE); s.close(); } catch (SQLException e) { @@ -1781,37 +1788,6 @@ abstract class JdbcDatabase implements Database { } } - @Override - public long getMessageBytesToSend(Connection txn, ContactId c, - int maxLatency) throws DbException { - long now = clock.currentTimeMillis(); - long eta = now + maxLatency; - PreparedStatement ps = null; - ResultSet rs = null; - try { - String sql = "SELECT SUM(length) FROM statuses" - + " WHERE contactId = ? AND state = ?" - + " AND groupShared = TRUE AND messageShared = TRUE" - + " AND deleted = FALSE AND seen = FALSE" - + " AND (expiry <= ? OR eta > ?)"; - ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - ps.setInt(2, DELIVERED.getValue()); - ps.setLong(3, now); - ps.setLong(4, eta); - rs = ps.executeQuery(); - rs.next(); - long total = rs.getInt(1); - rs.close(); - ps.close(); - return total; - } catch (SQLException e) { - tryToClose(rs, LOG, WARNING); - tryToClose(ps, LOG, WARNING); - throw new DbException(e); - } - } - @Override public Collection getMessageIds(Connection txn, GroupId g) throws DbException { @@ -2259,6 +2235,63 @@ abstract class JdbcDatabase implements Database { } } + @Override + public Map getUnackedMessagesToSend(Connection txn, + ContactId c) throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT length, messageId FROM statuses" + + " WHERE contactId = ? AND state = ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE AND seen = FALSE" + + " ORDER BY txCount, timestamp"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + rs = ps.executeQuery(); + Map results = new LinkedHashMap<>(); + while (rs.next()) { + int length = rs.getInt(1); + MessageId id = new MessageId(rs.getBytes(2)); + results.put(id, length); + } + rs.close(); + ps.close(); + return results; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + + @Override + public long getUnackedMessageBytesToSend(Connection txn, ContactId c) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT SUM(length) FROM statuses" + + " WHERE contactId = ? AND state = ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE AND seen = FALSE"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + rs = ps.executeQuery(); + rs.next(); + long total = rs.getInt(1); + rs.close(); + ps.close(); + return total; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public Collection getMessagesToValidate(Connection txn) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/AbstractRemovableDrivePlugin.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/AbstractRemovableDrivePlugin.java index 356186355..8e8d35488 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/AbstractRemovableDrivePlugin.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/AbstractRemovableDrivePlugin.java @@ -92,6 +92,11 @@ abstract class AbstractRemovableDrivePlugin implements SimplexPlugin { throw new UnsupportedOperationException(); } + @Override + public boolean isLossyAndCheap() { + return true; + } + @Override public TransportConnectionReader createReader(TransportProperties p) { try { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/FileTransportWriter.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/FileTransportWriter.java index 2ab164784..c1d9c6748 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/FileTransportWriter.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/FileTransportWriter.java @@ -36,6 +36,11 @@ class FileTransportWriter implements TransportConnectionWriter { return plugin.getMaxIdleTime(); } + @Override + public boolean isLossyAndCheap() { + return plugin.isLossyAndCheap(); + } + @Override public OutputStream getOutputStream() { return out; diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java index 26f5c4969..1055f9de9 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveWriterTask.java @@ -60,10 +60,9 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl setSuccess(false); return; } - int maxLatency = plugin.getMaxLatency(); try { setTotal(db.transactionWithResult(true, txn -> - db.getMessageBytesToSend(txn, contactId, maxLatency))); + db.getUnackedMessageBytesToSend(txn, contactId))); } catch (DbException e) { logException(LOG, WARNING, e); registry.removeWriter(this); @@ -106,6 +105,11 @@ class RemovableDriveWriterTask extends RemovableDriveTaskImpl return delegate.getMaxIdleTime(); } + @Override + public boolean isLossyAndCheap() { + return delegate.isLossyAndCheap(); + } + @Override public OutputStream getOutputStream() throws IOException { return delegate.getOutputStream(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/TransportOutputStreamWriter.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/TransportOutputStreamWriter.java index be40fbca0..db7ba5555 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/TransportOutputStreamWriter.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/TransportOutputStreamWriter.java @@ -1,8 +1,8 @@ package org.briarproject.bramble.plugin.file; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.plugin.Plugin; import org.briarproject.bramble.api.plugin.TransportConnectionWriter; +import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; import java.io.OutputStream; import java.util.logging.Logger; @@ -17,10 +17,10 @@ class TransportOutputStreamWriter implements TransportConnectionWriter { private static final Logger LOG = getLogger(TransportOutputStreamWriter.class.getName()); - private final Plugin plugin; + private final SimplexPlugin plugin; private final OutputStream out; - TransportOutputStreamWriter(Plugin plugin, OutputStream out) { + TransportOutputStreamWriter(SimplexPlugin plugin, OutputStream out) { this.plugin = plugin; this.out = out; } @@ -35,6 +35,11 @@ class TransportOutputStreamWriter implements TransportConnectionWriter { return plugin.getMaxIdleTime(); } + @Override + public boolean isLossyAndCheap() { + return plugin.isLossyAndCheap(); + } + @Override public OutputStream getOutputStream() { return out; diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 1d32ca4ee..f5c7fcbb6 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -15,6 +15,7 @@ import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; +import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.Versions; @@ -22,7 +23,11 @@ import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent; import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -61,6 +66,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private final ContactId contactId; private final TransportId transportId; private final int maxLatency; + private final boolean eager; private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; private final AtomicInteger outstandingQueries; @@ -70,7 +76,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, EventBus eventBus, ContactId contactId, TransportId transportId, - int maxLatency, StreamWriter streamWriter, + int maxLatency, boolean eager, StreamWriter streamWriter, SyncRecordWriter recordWriter) { this.db = db; this.dbExecutor = dbExecutor; @@ -78,6 +84,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { this.contactId = contactId; this.transportId = transportId; this.maxLatency = maxLatency; + this.eager = eager; this.streamWriter = streamWriter; this.recordWriter = recordWriter; outstandingQueries = new AtomicInteger(2); // One per type of record @@ -92,8 +99,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener { // Send our supported protocol versions recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); // Start a query for each type of record - dbExecutor.execute(new GenerateAck()); - dbExecutor.execute(new GenerateBatch()); + dbExecutor.execute(this::generateAck); + if (eager) dbExecutor.execute(this::loadUnackedMessageIds); + else dbExecutor.execute(this::generateBatch); // Write records until interrupted or no more records to write try { while (!interrupted) { @@ -138,81 +146,110 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } } - private class GenerateAck implements Runnable { - - @DatabaseExecutor - @Override - public void run() { - if (interrupted) return; - try { - Ack a = db.transactionWithNullableResult(false, txn -> - db.generateAck(txn, contactId, MAX_MESSAGE_IDS)); - if (LOG.isLoggable(INFO)) - LOG.info("Generated ack: " + (a != null)); - if (a == null) decrementOutstandingQueries(); - else writerTasks.add(new WriteAck(a)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); + @DatabaseExecutor + private void loadUnackedMessageIds() { + if (interrupted) return; + try { + Map ids = db.transactionWithResult(true, txn -> + db.getUnackedMessagesToSend(txn, contactId)); + if (LOG.isLoggable(INFO)) { + LOG.info(ids.size() + " unacked messages to send"); } + if (ids.isEmpty()) decrementOutstandingQueries(); + else dbExecutor.execute(() -> generateEagerBatch(ids)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); } } - private class WriteAck implements ThrowingRunnable { - - private final Ack ack; - - private WriteAck(Ack ack) { - this.ack = ack; + @DatabaseExecutor + private void generateEagerBatch(Map ids) { + if (interrupted) return; + // Take some message IDs from `ids` to form a batch + Collection batchIds = new ArrayList<>(); + long totalLength = 0; + Iterator> it = ids.entrySet().iterator(); + while (it.hasNext()) { + // Check whether the next message will fit in the batch + Entry e = it.next(); + int length = e.getValue(); + if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break; + // Add the message to the batch + it.remove(); + batchIds.add(e.getKey()); + totalLength += length; } - - @IoExecutor - @Override - public void run() throws IOException { - if (interrupted) return; - recordWriter.writeAck(ack); - LOG.info("Sent ack"); - dbExecutor.execute(new GenerateAck()); + if (batchIds.isEmpty()) throw new AssertionError(); + try { + Collection batch = + db.transactionWithResult(false, txn -> + db.generateBatch(txn, contactId, batchIds, + maxLatency)); + writerTasks.add(() -> writeEagerBatch(batch, ids)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); } } - private class GenerateBatch implements Runnable { + @IoExecutor + private void writeEagerBatch(Collection batch, + Map ids) throws IOException { + if (interrupted) return; + for (Message m : batch) recordWriter.writeMessage(m); + LOG.info("Sent eager batch"); + if (ids.isEmpty()) decrementOutstandingQueries(); + else dbExecutor.execute(() -> generateEagerBatch(ids)); + } - @DatabaseExecutor - @Override - public void run() { - if (interrupted) return; - try { - Collection b = - db.transactionWithNullableResult(false, txn -> - db.generateBatch(txn, contactId, - MAX_RECORD_PAYLOAD_BYTES, maxLatency)); - if (LOG.isLoggable(INFO)) - LOG.info("Generated batch: " + (b != null)); - if (b == null) decrementOutstandingQueries(); - else writerTasks.add(new WriteBatch(b)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); - } + @DatabaseExecutor + private void generateAck() { + if (interrupted) return; + try { + Ack a = db.transactionWithNullableResult(false, txn -> + db.generateAck(txn, contactId, MAX_MESSAGE_IDS)); + if (LOG.isLoggable(INFO)) + LOG.info("Generated ack: " + (a != null)); + if (a == null) decrementOutstandingQueries(); + else writerTasks.add(() -> writeAck(a)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); } } - private class WriteBatch implements ThrowingRunnable { + @IoExecutor + private void writeAck(Ack ack) throws IOException { + if (interrupted) return; + recordWriter.writeAck(ack); + LOG.info("Sent ack"); + dbExecutor.execute(this::generateAck); + } - private final Collection batch; - - private WriteBatch(Collection batch) { - this.batch = batch; + @DatabaseExecutor + private void generateBatch() { + if (interrupted) return; + try { + Collection b = + db.transactionWithNullableResult(false, txn -> + db.generateBatch(txn, contactId, + MAX_RECORD_PAYLOAD_BYTES, maxLatency)); + if (LOG.isLoggable(INFO)) + LOG.info("Generated batch: " + (b != null)); + if (b == null) decrementOutstandingQueries(); + else writerTasks.add(() -> writeBatch(b)); + } catch (DbException e) { + logException(LOG, WARNING, e); + interrupt(); } + } - @IoExecutor - @Override - public void run() throws IOException { - if (interrupted) return; - for (Message m : batch) recordWriter.writeMessage(m); - LOG.info("Sent batch"); - dbExecutor.execute(new GenerateBatch()); - } + @IoExecutor + private void writeBatch(Collection batch) throws IOException { + if (interrupted) return; + for (Message m : batch) recordWriter.writeMessage(m); + LOG.info("Sent batch"); + dbExecutor.execute(this::generateBatch); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index 4c590df73..37fe1f9d1 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -60,12 +60,12 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { @Override public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, - int maxLatency, StreamWriter streamWriter) { + int maxLatency, boolean eager, StreamWriter streamWriter) { OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t, - maxLatency, streamWriter, recordWriter); + maxLatency, eager, streamWriter, recordWriter); } @Override diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java index 4b53a781d..a46f893ea 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java @@ -358,7 +358,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { try { db.transaction(true, transaction -> - db.getMessageBytesToSend(transaction, contactId, 123)); + db.getUnackedMessageBytesToSend(transaction, contactId)); fail(); } catch (NoSuchContactException expected) { // Expected diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java index 2da33c18d..db6b3411b 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java @@ -228,7 +228,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); // Changing the status to seen = true should make the message unsendable db.raiseSeenFlag(txn, contactId, messageId); @@ -236,7 +236,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); @@ -261,7 +261,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Marking the message delivered should make it sendable db.setMessageState(txn, messageId, DELIVERED); @@ -270,7 +270,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); // Marking the message invalid should make it unsendable db.setMessageState(txn, messageId, INVALID); @@ -278,7 +278,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Marking the message pending should make it unsendable db.setMessageState(txn, messageId, PENDING); @@ -286,7 +286,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); @@ -310,7 +310,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Making the group visible should not make the message sendable db.addGroupVisibility(txn, contactId, groupId, false); @@ -318,7 +318,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Sharing the group should make the message sendable db.setGroupVisibility(txn, contactId, groupId, true); @@ -327,7 +327,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); // Unsharing the group should make the message unsendable db.setGroupVisibility(txn, contactId, groupId, false); @@ -335,7 +335,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Making the group invisible should make the message unsendable db.removeGroupVisibility(txn, contactId, groupId); @@ -343,7 +343,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); @@ -368,7 +368,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { assertTrue(ids.isEmpty()); ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertTrue(ids.isEmpty()); - assertEquals(0, db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); // Sharing the message should make it sendable db.setMessageShared(txn, messageId, true); @@ -377,7 +377,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { ids = db.getMessagesToOffer(txn, contactId, 100, MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); @@ -402,14 +402,14 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { MAX_LATENCY); assertTrue(ids.isEmpty()); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); // The message is just the right size to send ids = db.getMessagesToSend(txn, contactId, message.getRawLength(), MAX_LATENCY); assertEquals(singletonList(messageId), ids); assertEquals(message.getRawLength(), - db.getMessageBytesToSend(txn, contactId, MAX_LATENCY)); + db.getUnackedMessageBytesToSend(txn, contactId)); db.commitTransaction(txn); db.close(); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index df8dc7b87..9bcf7cc3f 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -17,9 +17,14 @@ import org.briarproject.bramble.test.DbExpectations; import org.briarproject.bramble.test.ImmediateExecutor; import org.junit.Test; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.concurrent.Executor; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; +import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getMessage; @@ -39,14 +44,19 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { private final Executor dbExecutor = new ImmediateExecutor(); private final ContactId contactId = getContactId(); private final TransportId transportId = getTransportId(); - private final Message message = getMessage(new GroupId(getRandomId())); - private final MessageId messageId = message.getId(); + private final Ack ack = + new Ack(singletonList(new MessageId(getRandomId()))); + private final Message message = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); + private final Message message1 = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); @Test public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter); + false, streamWriter, recordWriter); + Transaction noAckTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false); @@ -63,8 +73,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { // No messages to send oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); - oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + oneOf(db).generateBatch(noMsgTxn, contactId, + MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); @@ -76,11 +86,44 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { } @Test - public void testSomethingToSend() throws Exception { - Ack ack = new Ack(singletonList(messageId)); + public void testNothingToSendEagerly() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter); + true, streamWriter, recordWriter); + + Transaction noAckTxn = new Transaction(null, false); + Transaction noIdsTxn = new Transaction(null, true); + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // No acks to send + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); + // No messages to send + oneOf(db).transactionWithResult(with(true), + withDbCallable(noIdsTxn)); + oneOf(db).getUnackedMessagesToSend(noIdsTxn, contactId); + will(returnValue(emptyMap())); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } + + @Test + public void testSomethingToSend() throws Exception { + SimplexOutgoingSession session = new SimplexOutgoingSession(db, + dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + false, streamWriter, recordWriter); + Transaction ackTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false); @@ -100,8 +143,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { // One message to send oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(msgTxn)); - oneOf(db).generateBatch(with(msgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + oneOf(db).generateBatch(msgTxn, contactId, + MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(singletonList(message))); oneOf(recordWriter).writeMessage(message); // No more acks @@ -112,8 +155,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { // No more messages oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); - oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(MAX_LATENCY)); + oneOf(db).generateBatch(noMsgTxn, contactId, + MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); @@ -123,4 +166,63 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { session.run(); } + + @Test + public void testSomethingToSendEagerly() throws Exception { + SimplexOutgoingSession session = new SimplexOutgoingSession(db, + dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + true, streamWriter, recordWriter); + + Map unacked = new LinkedHashMap<>(); + unacked.put(message.getId(), message.getRawLength()); + unacked.put(message1.getId(), message1.getRawLength()); + + Transaction ackTxn = new Transaction(null, false); + Transaction noAckTxn = new Transaction(null, false); + Transaction idsTxn = new Transaction(null, true); + Transaction msgTxn = new Transaction(null, false); + Transaction msgTxn1 = new Transaction(null, false); + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // One ack to send + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(ackTxn)); + oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(ack)); + oneOf(recordWriter).writeAck(ack); + // No more acks + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); + // Two messages to send + oneOf(db).transactionWithResult(with(true), withDbCallable(idsTxn)); + oneOf(db).getUnackedMessagesToSend(idsTxn, contactId); + will(returnValue(unacked)); + // Send the first message + oneOf(db).transactionWithResult(with(false), + withDbCallable(msgTxn)); + oneOf(db).generateBatch(msgTxn, contactId, + singletonList(message.getId()), MAX_LATENCY); + will(returnValue(singletonList(message))); + oneOf(recordWriter).writeMessage(message); + // Send the second message + oneOf(db).transactionWithResult(with(false), + withDbCallable(msgTxn1)); + oneOf(db).generateBatch(msgTxn1, contactId, + singletonList(message1.getId()), MAX_LATENCY); + will(returnValue(singletonList(message1))); + oneOf(recordWriter).writeMessage(message1); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java b/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java index 0ed6f9045..83ec8129e 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/test/TestDuplexTransportConnection.java @@ -25,7 +25,7 @@ public class TestDuplexTransportConnection @SuppressWarnings("WeakerAccess") public TestDuplexTransportConnection(InputStream in, OutputStream out) { reader = new TestTransportConnectionReader(in); - writer = new TestTransportConnectionWriter(out); + writer = new TestTransportConnectionWriter(out, false); } @Override diff --git a/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java b/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java index 238208207..44e570fd7 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/test/TestTransportConnectionWriter.java @@ -15,10 +15,13 @@ public class TestTransportConnectionWriter implements TransportConnectionWriter { private final OutputStream out; + private final boolean lossyAndCheap; private final CountDownLatch disposed = new CountDownLatch(1); - public TestTransportConnectionWriter(OutputStream out) { + public TestTransportConnectionWriter(OutputStream out, + boolean lossyAndCheap) { this.out = out; + this.lossyAndCheap = lossyAndCheap; } public CountDownLatch getDisposedLatch() { @@ -35,6 +38,11 @@ public class TestTransportConnectionWriter return 60_000; } + @Override + public boolean isLossyAndCheap() { + return lossyAndCheap; + } + @Override public OutputStream getOutputStream() { return out; diff --git a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java index 16e540e95..2ac9fff1b 100644 --- a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java +++ b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java @@ -11,7 +11,9 @@ import org.briarproject.bramble.api.identity.IdentityManager; import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.GroupId; +import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent; +import org.briarproject.bramble.api.sync.event.MessagesSentEvent; import org.briarproject.bramble.test.TestDatabaseConfigModule; import org.briarproject.bramble.test.TestTransportConnectionReader; import org.briarproject.bramble.test.TestTransportConnectionWriter; @@ -71,7 +73,16 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { } @Test - public void testWriteAndRead() throws Exception { + public void testWriteAndReadWithLazyRetransmission() throws Exception { + testWriteAndRead(false); + } + + @Test + public void testWriteAndReadWithEagerRetransmission() throws Exception { + testWriteAndRead(true); + } + + private void testWriteAndRead(boolean eager) throws Exception { // Create the identities Identity aliceIdentity = alice.getIdentityManager().createIdentity("Alice"); @@ -86,16 +97,21 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { bob.getEventBus().addListener(listener); // Alice sends a private message to Bob sendMessage(alice, bobId); - // Sync Alice's client versions and transport properties - read(bob, write(alice, bobId), 2); - // Sync Bob's client versions and transport properties - read(alice, write(bob, aliceId), 2); - // Sync the private message and the attachment - read(bob, write(alice, bobId), 2); + // Sync Alice's client versions + read(bob, write(alice, bobId, eager, 1), 1); + // Sync Bob's client versions + read(alice, write(bob, aliceId, eager, 1), 1); + // Sync Alice's second client versioning update (with the active flag + // raised), the private message and the attachment + read(bob, write(alice, bobId, eager, 3), 3); // Bob should have received the private message assertTrue(listener.messageAdded); // Bob should have received the attachment assertTrue(listener.attachmentAdded); + // Sync messages from Alice to Bob again. If using eager + // retransmission, the three unacked messages should be sent again. + // They're all duplicates, so no further deliveries should occur + read(bob, write(alice, bobId, eager, eager ? 3 : 0), 0); } private ContactId setUp(SimplexMessagingIntegrationTestComponent device, @@ -149,15 +165,24 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { } private byte[] write(SimplexMessagingIntegrationTestComponent device, - ContactId contactId) throws Exception { + ContactId contactId, boolean eager, int transmissions) + throws Exception { + // Listen for message transmissions + MessageTransmissionListener listener = + new MessageTransmissionListener(transmissions); + device.getEventBus().addListener(listener); // Write the outgoing stream ByteArrayOutputStream out = new ByteArrayOutputStream(); TestTransportConnectionWriter writer = - new TestTransportConnectionWriter(out); + new TestTransportConnectionWriter(out, eager); device.getConnectionManager().manageOutgoingConnection(contactId, SIMPLEX_TRANSPORT_ID, writer); // Wait for the writer to be disposed writer.getDisposedLatch().await(TIMEOUT_MS, MILLISECONDS); + // Check that the expected number of messages were sent + assertTrue(listener.sent.await(TIMEOUT_MS, MILLISECONDS)); + // Clean up the listener + device.getEventBus().removeListener(listener); // Return the contents of the stream return out.toByteArray(); } @@ -178,6 +203,24 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { deleteTestDirectory(testDir); } + @NotNullByDefault + private static class MessageTransmissionListener implements EventListener { + + private final CountDownLatch sent; + + private MessageTransmissionListener(int transmissions) { + sent = new CountDownLatch(transmissions); + } + + @Override + public void eventOccurred(Event e) { + if (e instanceof MessagesSentEvent) { + MessagesSentEvent m = (MessagesSentEvent) e; + for (MessageId ignored : m.getMessageIds()) sent.countDown(); + } + } + } + @NotNullByDefault private static class MessageDeliveryListener implements EventListener { @@ -191,7 +234,9 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { public void eventOccurred(Event e) { if (e instanceof MessageStateChangedEvent) { MessageStateChangedEvent m = (MessageStateChangedEvent) e; - if (m.getState().equals(DELIVERED)) delivered.countDown(); + if (!m.isLocal() && m.getState().equals(DELIVERED)) { + delivered.countDown(); + } } } } diff --git a/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java b/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java index 0bcdab2a9..e493b4fa5 100644 --- a/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java +++ b/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java @@ -429,7 +429,7 @@ public abstract class BriarIntegrationTest